Optimizations to thread pool. Changed call queues thread
[silc.git] / lib / silcutil / silcthread.c
1 /*
2
3   silcthread.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2007 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silc.h"
21
22 /* Explanation of the thread pool execution.
23
24    When new call is added to thread pool by calling silc_thread_pool_run
25    it is assigned to a first free thread from the free list.  If no threads
26    are available we take one from the threads list and assign the call to
27    its queue.  The threads list always takes different thread finally wrapping
28    from the beginning.  This way each thread will get a chance to execute
29    queued calls.
30
31    The thread function silc_thread_pool_run_thread executes each call.  After
32    executing the current call that has been assigned to it, it will check
33    if there are any queued calls in its queue, and it will execute all calls
34    from the queue.  If there aren't any calls in the queue, it will attempt
35    to steal a call from some other thread and execute it.
36
37    The queue list is always consumed in last-in-first-out order.  The most
38    recently added call gets priority.  With full utilization this helps to
39    avoid CPU cache misses.  Since the queues are thread specific with full
40    utilization each thread should always be doing work for the most recent
41    (and thus most important) calls. */
42
43 /************************** Types and definitions ***************************/
44
45 /* Thread pool thread context.  Each thread contains the most current call
46    to be executed, and a list of queued calls. */
47 typedef struct SilcThreadPoolThreadStruct {
48   struct SilcThreadPoolThreadStruct *next;
49   struct SilcThreadPoolThreadStruct *next2;
50   SilcThreadPool tp;                /* The thread pool */
51   SilcMutex lock;                   /* Thread lock */
52   SilcList queue;                   /* Queue for waiting calls */
53   SilcList free_queue;              /* Queue freelist */
54   SilcSchedule schedule;            /* The current Scheduler, may be NULL */
55   SilcThreadPoolFunc run;           /* The current call to run in a thread */
56   SilcTaskCallback completion;      /* The current Completion function */
57   void *run_context;
58   void *completion_context;
59   unsigned int stop        : 1;     /* Set to stop the thread */
60 } *SilcThreadPoolThread;
61
62 /* Thread pool context */
63 struct SilcThreadPoolStruct {
64   SilcStack stack;                  /* Stack for memory allocation */
65   SilcMutex lock;                   /* Pool lock */
66   SilcCond pool_signal;             /* Condition variable for signalling */
67   SilcList threads;                 /* Threads in the pool */
68   SilcList free_threads;            /* Threads freelist */
69   SilcUInt16 min_threads;           /* Minimum threads in the pool */
70   SilcUInt16 max_threads;           /* Maximum threads in the pool */
71   SilcUInt16 refcnt;                /* Reference counter */
72   unsigned int destroy       : 1;   /* Set when pool is to be destroyed */
73 };
74
75 /************************ Static utility functions **************************/
76
77 /* Reference thread pool.  Must be called locked. */
78
79 static void silc_thread_pool_ref(SilcThreadPool tp)
80 {
81   tp->refcnt++;
82   SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
83                   tp->refcnt));
84 }
85
86 /* Unreference thread pool.  Must be called locked.  Releases the lock. */
87
88 static void silc_thread_pool_unref(SilcThreadPool tp)
89 {
90   tp->refcnt--;
91   SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
92                   tp->refcnt));
93   if (!tp->refcnt) {
94     SilcStack stack = tp->stack;
95     silc_mutex_unlock(tp->lock);
96     silc_mutex_free(tp->lock);
97     silc_cond_free(tp->pool_signal);
98     silc_sfree(stack, tp);
99     silc_stack_free(stack);
100     return;
101   }
102   silc_mutex_unlock(tp->lock);
103 }
104
105 /* The thread executor.  Each thread in the pool is run here.  They wait
106    here for something to do which is given to them by silc_thread_pool_run. */
107
108 static void *silc_thread_pool_run_thread(void *context)
109 {
110   SilcThreadPoolThread t = context, o, q;
111   SilcThreadPool tp = t->tp;
112   SilcMutex lock = tp->lock;
113   SilcCond pool_signal = tp->pool_signal;
114
115   silc_mutex_lock(lock);
116
117   while (1) {
118     /* Wait here for code to execute */
119     while (!t->run && !t->stop)
120       silc_cond_wait(pool_signal, lock);
121
122     if (silc_unlikely(t->stop)) {
123       /* Stop the thread.  Remove from threads list and free memory. */
124       SILC_LOG_DEBUG(("Stop thread %p", t));
125       silc_list_del(tp->threads, t);
126       silc_list_start(tp->threads);
127
128       /* Clear thread's call queue. */
129       silc_list_start(t->queue);
130       silc_list_start(t->free_queue);
131       while ((q = silc_list_get(t->queue)))
132         silc_sfree(tp->stack, q);
133       while ((q = silc_list_get(t->free_queue)))
134         silc_sfree(tp->stack, q);
135
136       silc_mutex_free(t->lock);
137       silc_sfree(tp->stack, t);
138
139       /* If we are last thread, signal the waiting destructor. */
140       if (silc_list_count(tp->threads) == 0)
141         silc_cond_signal(pool_signal);
142
143       /* Release pool reference.  Releases lock also. */
144       silc_thread_pool_unref(tp);
145       break;
146     }
147     silc_mutex_unlock(lock);
148
149     /* Execute code */
150   execute:
151     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
152                     t->run_context, t));
153     t->run(t->schedule, t->run_context);
154
155     /* If scheduler is NULL, call completion directly from here.  Otherwise
156        it is called through the scheduler in the thread where the scheduler
157        is running. */
158     if (t->completion) {
159       if (t->schedule) {
160         SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
161         if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
162                                             t->completion_context, 0, 0)) {
163           SILC_LOG_DEBUG(("Run completion directly"));
164           t->completion(NULL, NULL, 0, 0, t->completion_context);
165         }
166         silc_schedule_wakeup(t->schedule);
167       } else {
168         SILC_LOG_DEBUG(("Run completion directly"));
169         t->completion(NULL, NULL, 0, 0, t->completion_context);
170       }
171     }
172
173     /* Check if there are calls in queue.  Takes the most recently added
174        call since new ones are added at the start of the list. */
175     silc_mutex_lock(t->lock);
176     if (silc_list_count(t->queue) > 0) {
177       silc_list_start(t->queue);
178       q = silc_list_get(t->queue);
179
180       SILC_LOG_DEBUG(("Execute call from queue"));
181
182       /* Execute this call now */
183       t->run = q->run;
184       t->run_context = q->run_context;
185       t->completion = q->completion;
186       t->completion_context = q->completion_context;
187       t->schedule = q->schedule;
188
189       silc_list_del(t->queue, q);
190       silc_list_add(t->free_queue, q);
191       silc_mutex_unlock(t->lock);
192       goto execute;
193     } else {
194       silc_mutex_unlock(t->lock);
195
196       /* Nothing to do.  Attempt to steal call from some other thread. */
197       silc_mutex_lock(lock);
198       o = silc_list_get(tp->threads);
199       if (!o) {
200         /* List wraps around */
201         silc_list_start(tp->threads);
202         o = silc_list_get(tp->threads);
203       }
204       silc_mutex_unlock(lock);
205
206       if (o && o != t) {
207         silc_mutex_lock(o->lock);
208         if (silc_list_count(o->queue) > 0) {
209           silc_list_start(o->queue);
210           q = silc_list_get(o->queue);
211
212           SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
213
214           /* Execute this call now */
215           t->run = q->run;
216           t->run_context = q->run_context;
217           t->completion = q->completion;
218           t->completion_context = q->completion_context;
219           t->schedule = q->schedule;
220
221           silc_list_del(o->queue, q);
222           silc_list_add(o->free_queue, q);
223           silc_mutex_unlock(o->lock);
224           goto execute;
225         }
226         silc_mutex_unlock(o->lock);
227       }
228     }
229
230     /* The thread is now free for use again. */
231     t->run = NULL;
232     t->completion = NULL;
233     t->schedule = NULL;
234
235     silc_mutex_lock(lock);
236     silc_list_add(tp->free_threads, t);
237   }
238
239   return NULL;
240 }
241
242 /* Creates new thread to thread pool */
243
244 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
245 {
246   SilcThreadPoolThread t;
247
248   t = silc_scalloc(tp->stack, 1, sizeof(*t));
249   if (!t)
250     return NULL;
251
252   if (!silc_mutex_alloc(&t->lock)) {
253     silc_sfree(tp->stack, t);
254     return NULL;
255   }
256
257   t->tp = tp;
258   silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
259   silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
260
261   /* Add to thread pool */
262   silc_list_add(tp->threads, t);
263   silc_list_add(tp->free_threads, t);
264   silc_thread_pool_ref(tp);
265
266   SILC_LOG_DEBUG(("Start thread %p", t));
267
268   /* Start the thread */
269   silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
270
271   return t;
272 }
273
274 /**************************** Thread Pool API *******************************/
275
276 /* Allocate thread pool */
277
278 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
279                                       SilcUInt32 min_threads,
280                                       SilcUInt32 max_threads,
281                                       SilcBool start_min_threads)
282 {
283   SilcThreadPool tp;
284   int i;
285
286   if (max_threads < min_threads)
287     return NULL;
288   if (!max_threads)
289     return NULL;
290
291   if (stack)
292     stack = silc_stack_alloc(0, stack);
293
294   tp = silc_scalloc(stack, 1, sizeof(*tp));
295   if (!tp) {
296     silc_stack_free(stack);
297     return NULL;
298   }
299
300   SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
301                   tp, min_threads, max_threads));
302
303   tp->stack = stack;
304   tp->min_threads = min_threads;
305   tp->max_threads = max_threads;
306   tp->refcnt++;
307
308   if (!silc_mutex_alloc(&tp->lock)) {
309     silc_sfree(stack, tp);
310     silc_stack_free(stack);
311     return NULL;
312   }
313
314   if (!silc_cond_alloc(&tp->pool_signal)) {
315     silc_mutex_free(tp->lock);
316     silc_sfree(stack, tp);
317     silc_stack_free(stack);
318     return NULL;
319   }
320
321   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
322   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
323
324   for (i = 0; i < tp->min_threads && start_min_threads; i++)
325     silc_thread_pool_new_thread(tp);
326
327   silc_list_start(tp->threads);
328
329   return tp;
330 }
331
332 /* Free thread pool */
333
334 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
335 {
336   SilcThreadPoolThread t;
337
338   SILC_LOG_DEBUG(("Free thread pool %p", tp));
339
340   silc_mutex_lock(tp->lock);
341   tp->destroy = TRUE;
342
343   /* Stop threads */
344   silc_list_start(tp->threads);
345   while ((t = silc_list_get(tp->threads)))
346     t->stop = TRUE;
347   silc_cond_broadcast(tp->pool_signal);
348
349   if (wait_unfinished) {
350     SILC_LOG_DEBUG(("Wait threads to finish"));
351     while (silc_list_count(tp->threads))
352       silc_cond_wait(tp->pool_signal, tp->lock);
353   }
354
355   /* Release reference.  Releases lock also. */
356   silc_thread_pool_unref(tp);
357 }
358
359 /* Execute code in a thread in the pool */
360
361 SilcBool silc_thread_pool_run(SilcThreadPool tp,
362                               SilcBool queuable,
363                               SilcSchedule schedule,
364                               SilcThreadPoolFunc run,
365                               void *run_context,
366                               SilcTaskCallback completion,
367                               void *completion_context)
368 {
369   SilcThreadPoolThread t, q;
370
371   silc_mutex_lock(tp->lock);
372
373   if (tp->destroy) {
374     silc_mutex_unlock(tp->lock);
375     return FALSE;
376   }
377
378   /* Get free thread */
379   silc_list_start(tp->free_threads);
380   t = silc_list_get(tp->free_threads);
381   if (!t) {
382     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
383       /* Maximum threads reached */
384       if (!queuable) {
385         silc_mutex_unlock(tp->lock);
386         return FALSE;
387       }
388
389       /* User wants to queue this call until thread becomes free.  Get
390          a thread to assign this call. */
391       t = silc_list_get(tp->threads);
392       if (!t) {
393         /* List wraps around */
394         silc_list_start(tp->threads);
395         t = silc_list_get(tp->threads);
396       }
397
398       SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
399                       run, run_context, t));
400
401       /* Lock the thread.  Keep also thread pool lock so that this thread
402          cannot become free while we're adding call to its queue. */
403       silc_mutex_lock(t->lock);
404
405       /* Get free call context from the list */
406       silc_list_start(t->free_queue);
407       q = silc_list_get(t->free_queue);
408       if (!q) {
409         q = silc_scalloc(tp->stack, 1, sizeof(*q));
410         if (!q) {
411           silc_mutex_unlock(t->lock);
412           silc_mutex_unlock(tp->lock);
413           return FALSE;
414         }
415       } else {
416         silc_list_del(t->free_queue, q);
417       }
418
419       q->run = run;
420       q->run_context = run_context;
421       q->completion = completion;
422       q->completion_context = completion_context;
423       q->schedule = schedule;
424
425       /* Add at the start of the list.  It gets executed first. */
426       silc_list_insert(t->queue, NULL, q);
427       silc_mutex_unlock(t->lock);
428       silc_mutex_unlock(tp->lock);
429       return TRUE;
430     } else {
431       /* Create new thread */
432       t = silc_thread_pool_new_thread(tp);
433       if (!t) {
434         silc_mutex_unlock(tp->lock);
435         return FALSE;
436       }
437     }
438   }
439
440   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
441
442   /* Mark this call to be executed in this thread */
443   t->run = run;
444   t->run_context = run_context;
445   t->completion = completion;
446   t->completion_context = completion_context;
447   t->schedule = schedule;
448   silc_list_del(tp->free_threads, t);
449
450   /* Signal threads */
451   silc_cond_broadcast(tp->pool_signal);
452
453   silc_mutex_unlock(tp->lock);
454   return TRUE;
455 }
456
457 /* Set maximum threads in the pool */
458
459 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
460                                       SilcUInt32 max_threads)
461 {
462   SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
463
464   silc_mutex_lock(tp->lock);
465   tp->max_threads = max_threads;
466   silc_mutex_unlock(tp->lock);
467 }
468
469 /* Get maximum threads in the pool */
470
471 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
472 {
473   SilcUInt32 max_threads;
474
475   silc_mutex_lock(tp->lock);
476   max_threads = tp->max_threads;
477   silc_mutex_unlock(tp->lock);
478
479   return max_threads;
480 }
481
482 /* Get numnber of free threads in the pool */
483
484 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
485 {
486   SilcUInt32 free_threads;
487
488   silc_mutex_lock(tp->lock);
489   free_threads = silc_list_count(tp->free_threads);
490   silc_mutex_unlock(tp->lock);
491
492   return free_threads;
493 }
494
495 /* Purge pool */
496
497 void silc_thread_pool_purge(SilcThreadPool tp)
498 {
499   SilcThreadPoolThread t;
500   int i;
501
502   silc_mutex_lock(tp->lock);
503
504   if (silc_list_count(tp->free_threads) <= tp->min_threads) {
505     SILC_LOG_DEBUG(("No threads to purge"));
506     silc_mutex_unlock(tp->lock);
507     return;
508   }
509
510   i = silc_list_count(tp->free_threads) - tp->min_threads;
511
512   SILC_LOG_DEBUG(("Purge %d threads", i));
513
514   silc_list_start(tp->threads);
515   while ((t = silc_list_get(tp->threads))) {
516     silc_mutex_lock(t->lock);
517     if (t->run) {
518       silc_mutex_unlock(t->lock);
519       continue;
520     }
521
522     t->stop = TRUE;
523     silc_mutex_unlock(t->lock);
524
525     silc_list_del(tp->free_threads, t);
526
527     i--;
528     if (!i)
529       break;
530   }
531
532   /* Signal threads to stop */
533   silc_cond_broadcast(tp->pool_signal);
534
535   silc_list_start(tp->threads);
536   silc_mutex_unlock(tp->lock);
537 }