794259c78dc11eab00bd71d39679053c0dabc988
[silc.git] / lib / silcutil / silcthread.c
1 /*
2
3   silcthread.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2007 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silc.h"
21
22 /* Explanation of the thread pool execution.
23
24    When new call is added to thread pool by calling silc_thread_pool_run
25    it is assigned to a first free thread from the free list.  If no threads
26    are available we take one from the threads list and assign the call to
27    its queue.  The threads list always takes different thread finally wrapping
28    from the beginning.  This way each thread will get a chance to execute
29    queued calls.
30
31    The thread function silc_thread_pool_run_thread executes each call.  After
32    executing the current call that has been assigned to it, it will check
33    if there are any queued calls in its queue, and it will execute all calls
34    from the queue.  If there aren't any calls in the queue, it will attempt
35    to steal a call from some other thread and execute it.
36
37    The queue list is always consumed in last-in-first-out order.  The most
38    recently added call gets priority.  With full utilization this helps to
39    avoid CPU cache misses.  Since the queues are thread specific with full
40    utilization each thread should always be doing work for the most recent
41    (and thus most important) calls. */
42
43 /************************** Types and definitions ***************************/
44
45 /* Thread pool thread context.  Each thread contains the most current call
46    to be executed, and a list of queued calls. */
47 typedef struct SilcThreadPoolThreadStruct {
48   struct SilcThreadPoolThreadStruct *next;
49   struct SilcThreadPoolThreadStruct *next2;
50   SilcThreadPool tp;                /* The thread pool */
51   SilcCond thread_signal;           /* Condition variable for signalling */
52   SilcMutex lock;                   /* Thread lock */
53   SilcList queue;                   /* Queue for waiting calls */
54   SilcList free_queue;              /* Queue freelist */
55   SilcSchedule schedule;            /* The current Scheduler, may be NULL */
56   SilcThreadPoolFunc run;           /* The current call to run in a thread */
57   SilcTaskCallback completion;      /* The current Completion function */
58   void *run_context;
59   void *completion_context;
60   unsigned int stop        : 1;     /* Set to stop the thread */
61 } *SilcThreadPoolThread;
62
63 /* Thread pool context */
64 struct SilcThreadPoolStruct {
65   SilcStack stack;                  /* Stack for memory allocation */
66   SilcCond pool_signal;             /* Condition variable for signalling */
67   SilcMutex lock;                   /* Pool lock */
68   SilcList threads;                 /* Threads in the pool */
69   SilcList free_threads;            /* Threads freelist */
70   SilcUInt16 min_threads;           /* Minimum threads in the pool */
71   SilcUInt16 max_threads;           /* Maximum threads in the pool */
72   SilcUInt16 refcnt;                /* Reference counter */
73   unsigned int destroy       : 1;   /* Set when pool is to be destroyed */
74 };
75
76 /************************ Static utility functions **************************/
77
78 /* Reference thread pool.  Must be called locked. */
79
80 static void silc_thread_pool_ref(SilcThreadPool tp)
81 {
82   tp->refcnt++;
83   SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
84                   tp->refcnt));
85 }
86
87 /* Unreference thread pool.  Must be called locked.  Releases the lock. */
88
89 static void silc_thread_pool_unref(SilcThreadPool tp)
90 {
91   tp->refcnt--;
92   SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
93                   tp->refcnt));
94   if (!tp->refcnt) {
95     SilcStack stack = tp->stack;
96     silc_mutex_unlock(tp->lock);
97     silc_mutex_free(tp->lock);
98     silc_cond_free(tp->pool_signal);
99     silc_sfree(stack, tp);
100     silc_stack_free(stack);
101     return;
102   }
103   silc_mutex_unlock(tp->lock);
104 }
105
106 /* The thread executor.  Each thread in the pool is run here.  They wait
107    here for something to do which is given to them by silc_thread_pool_run. */
108
109 static void *silc_thread_pool_run_thread(void *context)
110 {
111   SilcThreadPoolThread t = context, o, q;
112   SilcThreadPool tp = t->tp;
113   SilcMutex lock = t->lock;
114   SilcCond thread_signal = t->thread_signal;
115
116   silc_mutex_lock(lock);
117
118   while (1) {
119     /* Wait here for code to execute */
120     while (!t->run && !t->stop)
121       silc_cond_wait(thread_signal, lock);
122
123     if (t->stop)
124       goto stop;
125
126     /* Execute code */
127     silc_mutex_unlock(lock);
128   execute:
129     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
130                     t->run_context, t));
131     t->run(t->schedule, t->run_context);
132
133     /* If scheduler is NULL, call completion directly from here.  Otherwise
134        it is called through the scheduler in the thread where the scheduler
135        is running. */
136     if (t->completion) {
137       if (t->schedule) {
138         SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
139         if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
140                                             t->completion_context, 0, 0)) {
141           SILC_LOG_DEBUG(("Run completion directly"));
142           t->completion(NULL, NULL, 0, 0, t->completion_context);
143         }
144         silc_schedule_wakeup(t->schedule);
145       } else {
146         SILC_LOG_DEBUG(("Run completion directly"));
147         t->completion(NULL, NULL, 0, 0, t->completion_context);
148       }
149     }
150
151     silc_mutex_lock(lock);
152     if (t->stop)
153       goto stop;
154
155     /* Check if there are calls in queue.  Takes the most recently added
156        call since new ones are added at the start of the list. */
157     if (silc_list_count(t->queue) > 0) {
158     execute_queue:
159       silc_list_start(t->queue);
160       q = silc_list_get(t->queue);
161
162       SILC_LOG_DEBUG(("Execute call from queue"));
163
164       /* Execute this call now */
165       t->run = q->run;
166       t->run_context = q->run_context;
167       t->completion = q->completion;
168       t->completion_context = q->completion_context;
169       t->schedule = q->schedule;
170
171       silc_list_del(t->queue, q);
172       silc_list_add(t->free_queue, q);
173       silc_mutex_unlock(lock);
174       goto execute;
175     }
176
177     silc_mutex_unlock(lock);
178     silc_mutex_lock(tp->lock);
179
180     /* Nothing to do.  Attempt to steal call from some other thread. */
181     o = silc_list_get(tp->threads);
182     if (!o) {
183       /* List wraps around */
184       silc_list_start(tp->threads);
185       o = silc_list_get(tp->threads);
186     }
187
188     /* Check that the other thread is valid and has something to execute. */
189     silc_mutex_lock(o->lock);
190     if (o == t || o->stop || silc_list_count(o->queue) == 0) {
191       silc_mutex_unlock(o->lock);
192       o = NULL;
193     }
194
195     if (o) {
196       silc_mutex_unlock(tp->lock);
197       silc_list_start(o->queue);
198       q = silc_list_get(o->queue);
199
200       SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
201
202       /* Execute this call now */
203       t->run = q->run;
204       t->run_context = q->run_context;
205       t->completion = q->completion;
206       t->completion_context = q->completion_context;
207       t->schedule = q->schedule;
208
209       silc_list_del(o->queue, q);
210       silc_list_add(o->free_queue, q);
211       silc_mutex_unlock(o->lock);
212       goto execute;
213     }
214
215     silc_mutex_lock(lock);
216     if (t->stop) {
217       silc_mutex_unlock(tp->lock);
218       goto stop;
219     }
220
221     /* Now that we have the lock back, check the queue again. */
222     if (silc_list_count(t->queue) > 0) {
223       silc_mutex_unlock(tp->lock);
224       goto execute_queue;
225     }
226
227     /* The thread is now free for use again. */
228     t->run = NULL;
229     t->completion = NULL;
230     t->schedule = NULL;
231     silc_list_add(tp->free_threads, t);
232     silc_mutex_unlock(tp->lock);
233   }
234
235  stop:
236   /* Stop the thread.  Remove from threads list. */
237   SILC_LOG_DEBUG(("Stop thread %p", t));
238
239   /* We can unlock the thread now.  After we get the thread pool lock
240      no one can retrieve the thread anymore. */
241   silc_mutex_unlock(lock);
242   silc_mutex_lock(tp->lock);
243
244   silc_list_del(tp->threads, t);
245   silc_list_start(tp->threads);
246
247   /* Clear thread's call queue. */
248   silc_list_start(t->queue);
249   silc_list_start(t->free_queue);
250   while ((q = silc_list_get(t->queue)))
251     silc_sfree(tp->stack, q);
252   while ((q = silc_list_get(t->free_queue)))
253     silc_sfree(tp->stack, q);
254
255   /* Destroy the thread */
256   silc_mutex_free(lock);
257   silc_cond_free(thread_signal);
258   silc_sfree(tp->stack, t);
259
260   /* If we are last thread, signal the waiting destructor. */
261   if (silc_list_count(tp->threads) == 0)
262     silc_cond_signal(tp->pool_signal);
263
264   /* Release pool reference.  Releases lock also. */
265   silc_thread_pool_unref(tp);
266
267   return NULL;
268 }
269
270 /* Creates new thread to thread pool */
271
272 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
273 {
274   SilcThreadPoolThread t;
275
276   t = silc_scalloc(tp->stack, 1, sizeof(*t));
277   if (!t)
278     return NULL;
279
280   if (!silc_mutex_alloc(&t->lock)) {
281     silc_sfree(tp->stack, t);
282     return NULL;
283   }
284
285   if (!silc_cond_alloc(&t->thread_signal)) {
286     silc_mutex_free(t->lock);
287     silc_sfree(tp->stack, t);
288     return NULL;
289   }
290
291   t->tp = tp;
292   silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
293   silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
294
295   /* Add to thread pool */
296   silc_list_add(tp->threads, t);
297   silc_list_add(tp->free_threads, t);
298   silc_thread_pool_ref(tp);
299
300   SILC_LOG_DEBUG(("Start thread %p", t));
301
302   /* Start the thread */
303   silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
304
305   return t;
306 }
307
308 /**************************** Thread Pool API *******************************/
309
310 /* Allocate thread pool */
311
312 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
313                                       SilcUInt32 min_threads,
314                                       SilcUInt32 max_threads,
315                                       SilcBool start_min_threads)
316 {
317   SilcThreadPool tp;
318   int i;
319
320   if (max_threads < min_threads)
321     return NULL;
322   if (!max_threads)
323     return NULL;
324
325   if (stack)
326     stack = silc_stack_alloc(0, stack);
327
328   tp = silc_scalloc(stack, 1, sizeof(*tp));
329   if (!tp) {
330     silc_stack_free(stack);
331     return NULL;
332   }
333
334   SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
335                   tp, min_threads, max_threads));
336
337   tp->stack = stack;
338   tp->min_threads = min_threads;
339   tp->max_threads = max_threads;
340   tp->refcnt++;
341
342   if (!silc_mutex_alloc(&tp->lock)) {
343     silc_sfree(stack, tp);
344     silc_stack_free(stack);
345     return NULL;
346   }
347
348   if (!silc_cond_alloc(&tp->pool_signal)) {
349     silc_mutex_free(tp->lock);
350     silc_sfree(stack, tp);
351     silc_stack_free(stack);
352     return NULL;
353   }
354
355   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
356   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
357
358   for (i = 0; i < tp->min_threads && start_min_threads; i++)
359     silc_thread_pool_new_thread(tp);
360
361   silc_list_start(tp->threads);
362
363   return tp;
364 }
365
366 /* Free thread pool */
367
368 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
369 {
370   SilcThreadPoolThread t;
371
372   SILC_LOG_DEBUG(("Free thread pool %p", tp));
373
374   silc_mutex_lock(tp->lock);
375   tp->destroy = TRUE;
376
377   /* Stop threads */
378   silc_list_start(tp->threads);
379   while ((t = silc_list_get(tp->threads))) {
380     silc_mutex_lock(t->lock);
381     t->stop = TRUE;
382     silc_cond_signal(t->thread_signal);
383     silc_mutex_unlock(t->lock);
384   }
385
386   if (wait_unfinished) {
387     SILC_LOG_DEBUG(("Wait threads to finish"));
388     while (silc_list_count(tp->threads))
389       silc_cond_wait(tp->pool_signal, tp->lock);
390   }
391
392   /* Release reference.  Releases lock also. */
393   silc_thread_pool_unref(tp);
394 }
395
396 /* Execute code in a thread in the pool */
397
398 SilcBool silc_thread_pool_run(SilcThreadPool tp,
399                               SilcBool queuable,
400                               SilcSchedule schedule,
401                               SilcThreadPoolFunc run,
402                               void *run_context,
403                               SilcTaskCallback completion,
404                               void *completion_context)
405 {
406   SilcThreadPoolThread t, q;
407
408   silc_mutex_lock(tp->lock);
409
410   if (tp->destroy) {
411     silc_mutex_unlock(tp->lock);
412     return FALSE;
413   }
414
415   /* Get free thread */
416   silc_list_start(tp->free_threads);
417   t = silc_list_get(tp->free_threads);
418   if (!t || t->stop) {
419     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
420       /* Maximum threads reached */
421       if (!queuable) {
422         silc_mutex_unlock(tp->lock);
423         return FALSE;
424       }
425
426       /* User wants to queue this call until thread becomes free.  Get
427          a thread to assign this call. */
428       t = silc_list_get(tp->threads);
429       if (!t) {
430         /* List wraps around */
431         silc_list_start(tp->threads);
432         t = silc_list_get(tp->threads);
433       }
434       silc_mutex_unlock(tp->lock);
435
436       SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
437                       run, run_context, t));
438
439       silc_mutex_lock(t->lock);
440
441       /* Get free call context from the list */
442       silc_list_start(t->free_queue);
443       q = silc_list_get(t->free_queue);
444       if (!q) {
445         q = silc_scalloc(tp->stack, 1, sizeof(*q));
446         if (!q) {
447           silc_mutex_unlock(t->lock);
448           return FALSE;
449         }
450       } else {
451         silc_list_del(t->free_queue, q);
452       }
453
454       q->run = run;
455       q->run_context = run_context;
456       q->completion = completion;
457       q->completion_context = completion_context;
458       q->schedule = schedule;
459
460       /* Add at the start of the list.  It gets executed first. */
461       silc_list_insert(t->queue, NULL, q);
462       silc_mutex_unlock(t->lock);
463       return TRUE;
464     } else {
465       /* Create new thread */
466       t = silc_thread_pool_new_thread(tp);
467       if (!t) {
468         silc_mutex_unlock(tp->lock);
469         return FALSE;
470       }
471     }
472   }
473
474   silc_list_del(tp->free_threads, t);
475   silc_mutex_unlock(tp->lock);
476
477   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
478
479   silc_mutex_lock(t->lock);
480
481   /* Mark this call to be executed in this thread */
482   t->run = run;
483   t->run_context = run_context;
484   t->completion = completion;
485   t->completion_context = completion_context;
486   t->schedule = schedule;
487
488   /* Signal the thread */
489   silc_cond_signal(t->thread_signal);
490   silc_mutex_unlock(t->lock);
491
492   return TRUE;
493 }
494
495 /* Set maximum threads in the pool */
496
497 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
498                                       SilcUInt32 max_threads)
499 {
500   SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
501
502   silc_mutex_lock(tp->lock);
503   tp->max_threads = max_threads;
504   silc_mutex_unlock(tp->lock);
505 }
506
507 /* Get maximum threads in the pool */
508
509 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
510 {
511   SilcUInt32 max_threads;
512
513   silc_mutex_lock(tp->lock);
514   max_threads = tp->max_threads;
515   silc_mutex_unlock(tp->lock);
516
517   return max_threads;
518 }
519
520 /* Get numnber of free threads in the pool */
521
522 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
523 {
524   SilcUInt32 free_threads;
525
526   silc_mutex_lock(tp->lock);
527   free_threads = silc_list_count(tp->free_threads);
528   silc_mutex_unlock(tp->lock);
529
530   return free_threads;
531 }
532
533 /* Purge pool */
534
535 void silc_thread_pool_purge(SilcThreadPool tp)
536 {
537   SilcThreadPoolThread t;
538   int i;
539
540   silc_mutex_lock(tp->lock);
541
542   if (silc_list_count(tp->free_threads) <= tp->min_threads) {
543     SILC_LOG_DEBUG(("No threads to purge"));
544     silc_mutex_unlock(tp->lock);
545     return;
546   }
547
548   i = silc_list_count(tp->free_threads) - tp->min_threads;
549
550   SILC_LOG_DEBUG(("Purge %d threads", i));
551
552   silc_list_start(tp->threads);
553   while ((t = silc_list_get(tp->threads))) {
554     silc_mutex_lock(t->lock);
555     if (t->run) {
556       silc_mutex_unlock(t->lock);
557       continue;
558     }
559
560     /* Signal the thread to stop */
561     t->stop = TRUE;
562     silc_cond_signal(t->thread_signal);
563     silc_mutex_unlock(t->lock);
564
565     silc_list_del(tp->free_threads, t);
566
567     i--;
568     if (!i)
569       break;
570   }
571
572   silc_list_start(tp->threads);
573   silc_mutex_unlock(tp->lock);
574 }