Added SILC Thread Queue API
[silc.git] / lib / silcutil / silcthread.c
1 /*
2
3   silcthread.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2007 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silc.h"
21
22 /***************************** Thread Pool API *****************************/
23
24 /* Explanation of the thread pool execution.
25
26    When new call is added to thread pool by calling silc_thread_pool_run
27    it is assigned to a first free thread from the free list.  If no threads
28    are available we take one from the threads list and assign the call to
29    its queue.  The threads list always takes different thread finally wrapping
30    from the beginning.  This way each thread will get a chance to execute
31    queued calls.
32
33    The thread function silc_thread_pool_run_thread executes each call.  After
34    executing the current call that has been assigned to it, it will check
35    if there are any queued calls in its queue, and it will execute all calls
36    from the queue.  If there aren't any calls in the queue, it will attempt
37    to steal a call from some other thread and execute it.
38
39    The queue list is always consumed in last-in-first-out order.  The most
40    recently added call gets priority.  With full utilization this helps to
41    avoid CPU cache misses.  Since the queues are thread specific with full
42    utilization each thread should always be doing work for the most recent
43    (and thus most important) calls. */
44
45 /************************** Types and definitions ***************************/
46
47 /* Thread pool thread context.  Each thread contains the most current call
48    to be executed, and a list of queued calls. */
49 typedef struct SilcThreadPoolThreadStruct {
50   struct SilcThreadPoolThreadStruct *next;
51   struct SilcThreadPoolThreadStruct *next2;
52   SilcThreadPool tp;                /* The thread pool */
53   SilcCond thread_signal;           /* Condition variable for signalling */
54   SilcMutex lock;                   /* Thread lock */
55   SilcList queue;                   /* Queue for waiting calls */
56   SilcList free_queue;              /* Queue freelist */
57   SilcSchedule schedule;            /* The current Scheduler, may be NULL */
58   SilcThreadPoolFunc run;           /* The current call to run in a thread */
59   SilcTaskCallback completion;      /* The current Completion function */
60   void *run_context;
61   void *completion_context;
62   unsigned int stop        : 1;     /* Set to stop the thread */
63 } *SilcThreadPoolThread;
64
65 /* Thread pool context */
66 struct SilcThreadPoolStruct {
67   SilcStack stack;                  /* Stack for memory allocation */
68   SilcCond pool_signal;             /* Condition variable for signalling */
69   SilcMutex lock;                   /* Pool lock */
70   SilcList threads;                 /* Threads in the pool */
71   SilcList free_threads;            /* Threads freelist */
72   SilcUInt16 min_threads;           /* Minimum threads in the pool */
73   SilcUInt16 max_threads;           /* Maximum threads in the pool */
74   SilcUInt16 refcnt;                /* Reference counter */
75   unsigned int destroy       : 1;   /* Set when pool is to be destroyed */
76 };
77
78 /************************ Static utility functions **************************/
79
80 /* Reference thread pool.  Must be called locked. */
81
82 static void silc_thread_pool_ref(SilcThreadPool tp)
83 {
84   tp->refcnt++;
85   SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
86                   tp->refcnt));
87 }
88
89 /* Unreference thread pool.  Must be called locked.  Releases the lock. */
90
91 static void silc_thread_pool_unref(SilcThreadPool tp)
92 {
93   tp->refcnt--;
94   SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
95                   tp->refcnt));
96   if (!tp->refcnt) {
97     SilcStack stack = tp->stack;
98     silc_mutex_unlock(tp->lock);
99     silc_mutex_free(tp->lock);
100     silc_cond_free(tp->pool_signal);
101     silc_sfree(stack, tp);
102     silc_stack_free(stack);
103     return;
104   }
105   silc_mutex_unlock(tp->lock);
106 }
107
108 /* The thread executor.  Each thread in the pool is run here.  They wait
109    here for something to do which is given to them by silc_thread_pool_run. */
110
111 static void *silc_thread_pool_run_thread(void *context)
112 {
113   SilcThreadPoolThread t = context, o, q;
114   SilcThreadPool tp = t->tp;
115   SilcMutex lock = t->lock;
116   SilcCond thread_signal = t->thread_signal;
117
118   silc_mutex_lock(lock);
119
120   while (1) {
121     /* Wait here for code to execute */
122     while (!t->run && !t->stop)
123       silc_cond_wait(thread_signal, lock);
124
125     if (t->stop)
126       goto stop;
127
128     /* Execute code */
129     silc_mutex_unlock(lock);
130   execute:
131     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
132                     t->run_context, t));
133     t->run(t->schedule, t->run_context);
134
135     /* If scheduler is NULL, call completion directly from here.  Otherwise
136        it is called through the scheduler in the thread where the scheduler
137        is running. */
138     if (t->completion) {
139       if (t->schedule) {
140         SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
141         if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
142                                             t->completion_context, 0, 0)) {
143           SILC_LOG_DEBUG(("Run completion directly"));
144           t->completion(NULL, NULL, 0, 0, t->completion_context);
145         }
146         silc_schedule_wakeup(t->schedule);
147       } else {
148         SILC_LOG_DEBUG(("Run completion directly"));
149         t->completion(NULL, NULL, 0, 0, t->completion_context);
150       }
151     }
152
153     silc_mutex_lock(lock);
154     if (t->stop)
155       goto stop;
156
157     /* Check if there are calls in queue.  Takes the most recently added
158        call since new ones are added at the start of the list. */
159     if (silc_list_count(t->queue) > 0) {
160     execute_queue:
161       silc_list_start(t->queue);
162       q = silc_list_get(t->queue);
163
164       SILC_LOG_DEBUG(("Execute call from queue"));
165
166       /* Execute this call now */
167       t->run = q->run;
168       t->run_context = q->run_context;
169       t->completion = q->completion;
170       t->completion_context = q->completion_context;
171       t->schedule = q->schedule;
172
173       silc_list_del(t->queue, q);
174       silc_list_add(t->free_queue, q);
175       silc_mutex_unlock(lock);
176       goto execute;
177     }
178
179     silc_mutex_unlock(lock);
180     silc_mutex_lock(tp->lock);
181
182     /* Nothing to do.  Attempt to steal call from some other thread. */
183     o = silc_list_get(tp->threads);
184     if (!o) {
185       /* List wraps around */
186       silc_list_start(tp->threads);
187       o = silc_list_get(tp->threads);
188     }
189
190     /* Check that the other thread is valid and has something to execute. */
191     silc_mutex_lock(o->lock);
192     if (o == t || o->stop || silc_list_count(o->queue) == 0) {
193       silc_mutex_unlock(o->lock);
194       o = NULL;
195     }
196
197     if (o) {
198       silc_mutex_unlock(tp->lock);
199       silc_list_start(o->queue);
200       q = silc_list_get(o->queue);
201
202       SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
203
204       /* Execute this call now */
205       t->run = q->run;
206       t->run_context = q->run_context;
207       t->completion = q->completion;
208       t->completion_context = q->completion_context;
209       t->schedule = q->schedule;
210
211       silc_list_del(o->queue, q);
212       silc_list_add(o->free_queue, q);
213       silc_mutex_unlock(o->lock);
214       goto execute;
215     }
216
217     silc_mutex_lock(lock);
218     if (t->stop) {
219       silc_mutex_unlock(tp->lock);
220       goto stop;
221     }
222
223     /* Now that we have the lock back, check the queue again. */
224     if (silc_list_count(t->queue) > 0) {
225       silc_mutex_unlock(tp->lock);
226       goto execute_queue;
227     }
228
229     /* The thread is now free for use again. */
230     t->run = NULL;
231     t->completion = NULL;
232     t->schedule = NULL;
233     silc_list_add(tp->free_threads, t);
234     silc_mutex_unlock(tp->lock);
235   }
236
237  stop:
238   /* Stop the thread.  Remove from threads list. */
239   SILC_LOG_DEBUG(("Stop thread %p", t));
240
241   /* We can unlock the thread now.  After we get the thread pool lock
242      no one can retrieve the thread anymore. */
243   silc_mutex_unlock(lock);
244   silc_mutex_lock(tp->lock);
245
246   silc_list_del(tp->threads, t);
247   silc_list_start(tp->threads);
248
249   /* Clear thread's call queue. */
250   silc_list_start(t->queue);
251   silc_list_start(t->free_queue);
252   while ((q = silc_list_get(t->queue)))
253     silc_sfree(tp->stack, q);
254   while ((q = silc_list_get(t->free_queue)))
255     silc_sfree(tp->stack, q);
256
257   /* Destroy the thread */
258   silc_mutex_free(lock);
259   silc_cond_free(thread_signal);
260   silc_sfree(tp->stack, t);
261
262   /* If we are last thread, signal the waiting destructor. */
263   if (silc_list_count(tp->threads) == 0)
264     silc_cond_signal(tp->pool_signal);
265
266   /* Release pool reference.  Releases lock also. */
267   silc_thread_pool_unref(tp);
268
269   return NULL;
270 }
271
272 /* Creates new thread to thread pool */
273
274 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
275 {
276   SilcThreadPoolThread t;
277
278   t = silc_scalloc(tp->stack, 1, sizeof(*t));
279   if (!t)
280     return NULL;
281
282   if (!silc_mutex_alloc(&t->lock)) {
283     silc_sfree(tp->stack, t);
284     return NULL;
285   }
286
287   if (!silc_cond_alloc(&t->thread_signal)) {
288     silc_mutex_free(t->lock);
289     silc_sfree(tp->stack, t);
290     return NULL;
291   }
292
293   t->tp = tp;
294   silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
295   silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
296
297   /* Add to thread pool */
298   silc_list_add(tp->threads, t);
299   silc_list_add(tp->free_threads, t);
300   silc_thread_pool_ref(tp);
301
302   SILC_LOG_DEBUG(("Start thread %p", t));
303
304   /* Start the thread */
305   silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
306
307   return t;
308 }
309
310 /**************************** Thread Pool API *******************************/
311
312 /* Allocate thread pool */
313
314 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
315                                       SilcUInt32 min_threads,
316                                       SilcUInt32 max_threads,
317                                       SilcBool start_min_threads)
318 {
319   SilcThreadPool tp;
320   int i;
321
322   if (max_threads < min_threads) {
323     silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT,
324                           "Max threads is smaller than min threads (%d < %d)",
325                           max_threads, min_threads);
326     return NULL;
327   }
328   if (!max_threads) {
329     silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0");
330     return NULL;
331   }
332
333   if (stack)
334     stack = silc_stack_alloc(0, stack);
335
336   tp = silc_scalloc(stack, 1, sizeof(*tp));
337   if (!tp) {
338     silc_stack_free(stack);
339     return NULL;
340   }
341
342   SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
343                   tp, min_threads, max_threads));
344
345   tp->stack = stack;
346   tp->min_threads = min_threads;
347   tp->max_threads = max_threads;
348   tp->refcnt++;
349
350   if (!silc_mutex_alloc(&tp->lock)) {
351     silc_sfree(stack, tp);
352     silc_stack_free(stack);
353     return NULL;
354   }
355
356   if (!silc_cond_alloc(&tp->pool_signal)) {
357     silc_mutex_free(tp->lock);
358     silc_sfree(stack, tp);
359     silc_stack_free(stack);
360     return NULL;
361   }
362
363   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
364   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
365
366   for (i = 0; i < tp->min_threads && start_min_threads; i++)
367     silc_thread_pool_new_thread(tp);
368
369   silc_list_start(tp->threads);
370
371   return tp;
372 }
373
374 /* Free thread pool */
375
376 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
377 {
378   SilcThreadPoolThread t;
379
380   SILC_LOG_DEBUG(("Free thread pool %p", tp));
381
382   silc_mutex_lock(tp->lock);
383   tp->destroy = TRUE;
384
385   /* Stop threads */
386   silc_list_start(tp->threads);
387   while ((t = silc_list_get(tp->threads))) {
388     silc_mutex_lock(t->lock);
389     t->stop = TRUE;
390     silc_cond_signal(t->thread_signal);
391     silc_mutex_unlock(t->lock);
392   }
393
394   if (wait_unfinished) {
395     SILC_LOG_DEBUG(("Wait threads to finish"));
396     while (silc_list_count(tp->threads))
397       silc_cond_wait(tp->pool_signal, tp->lock);
398   }
399
400   /* Release reference.  Releases lock also. */
401   silc_thread_pool_unref(tp);
402 }
403
404 /* Execute code in a thread in the pool */
405
406 SilcBool silc_thread_pool_run(SilcThreadPool tp,
407                               SilcBool queuable,
408                               SilcSchedule schedule,
409                               SilcThreadPoolFunc run,
410                               void *run_context,
411                               SilcTaskCallback completion,
412                               void *completion_context)
413 {
414   SilcThreadPoolThread t, q;
415
416   silc_mutex_lock(tp->lock);
417
418   if (tp->destroy) {
419     silc_mutex_unlock(tp->lock);
420     silc_set_errno(SILC_ERR_NOT_VALID);
421     return FALSE;
422   }
423
424   /* Get free thread */
425   silc_list_start(tp->free_threads);
426   t = silc_list_get(tp->free_threads);
427   if (!t || t->stop) {
428     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
429       /* Maximum threads reached */
430       if (!queuable) {
431         silc_mutex_unlock(tp->lock);
432         silc_set_errno(SILC_ERR_LIMIT);
433         return FALSE;
434       }
435
436       /* User wants to queue this call until thread becomes free.  Get
437          a thread to assign this call. */
438       t = silc_list_get(tp->threads);
439       if (!t) {
440         /* List wraps around */
441         silc_list_start(tp->threads);
442         t = silc_list_get(tp->threads);
443       }
444       silc_mutex_unlock(tp->lock);
445
446       SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
447                       run, run_context, t));
448
449       silc_mutex_lock(t->lock);
450
451       /* Get free call context from the list */
452       silc_list_start(t->free_queue);
453       q = silc_list_get(t->free_queue);
454       if (!q) {
455         q = silc_scalloc(tp->stack, 1, sizeof(*q));
456         if (!q) {
457           silc_mutex_unlock(t->lock);
458           return FALSE;
459         }
460       } else {
461         silc_list_del(t->free_queue, q);
462       }
463
464       q->run = run;
465       q->run_context = run_context;
466       q->completion = completion;
467       q->completion_context = completion_context;
468       q->schedule = schedule;
469
470       /* Add at the start of the list.  It gets executed first. */
471       silc_list_insert(t->queue, NULL, q);
472       silc_mutex_unlock(t->lock);
473       return TRUE;
474     } else {
475       /* Create new thread */
476       t = silc_thread_pool_new_thread(tp);
477       if (!t) {
478         silc_mutex_unlock(tp->lock);
479         return FALSE;
480       }
481     }
482   }
483
484   silc_list_del(tp->free_threads, t);
485   silc_mutex_unlock(tp->lock);
486
487   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
488
489   silc_mutex_lock(t->lock);
490
491   /* Mark this call to be executed in this thread */
492   t->run = run;
493   t->run_context = run_context;
494   t->completion = completion;
495   t->completion_context = completion_context;
496   t->schedule = schedule;
497
498   /* Signal the thread */
499   silc_cond_signal(t->thread_signal);
500   silc_mutex_unlock(t->lock);
501
502   return TRUE;
503 }
504
505 /* Set maximum threads in the pool */
506
507 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
508                                       SilcUInt32 max_threads)
509 {
510   SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
511
512   silc_mutex_lock(tp->lock);
513   tp->max_threads = max_threads;
514   silc_mutex_unlock(tp->lock);
515 }
516
517 /* Get maximum threads in the pool */
518
519 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
520 {
521   SilcUInt32 max_threads;
522
523   silc_mutex_lock(tp->lock);
524   max_threads = tp->max_threads;
525   silc_mutex_unlock(tp->lock);
526
527   return max_threads;
528 }
529
530 /* Get numnber of free threads in the pool */
531
532 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
533 {
534   SilcUInt32 free_threads;
535
536   silc_mutex_lock(tp->lock);
537   free_threads = silc_list_count(tp->free_threads);
538   silc_mutex_unlock(tp->lock);
539
540   return free_threads;
541 }
542
543 /* Purge pool */
544
545 void silc_thread_pool_purge(SilcThreadPool tp)
546 {
547   SilcThreadPoolThread t;
548   int i;
549
550   silc_mutex_lock(tp->lock);
551
552   if (silc_list_count(tp->free_threads) <= tp->min_threads) {
553     SILC_LOG_DEBUG(("No threads to purge"));
554     silc_mutex_unlock(tp->lock);
555     return;
556   }
557
558   i = silc_list_count(tp->free_threads) - tp->min_threads;
559
560   SILC_LOG_DEBUG(("Purge %d threads", i));
561
562   silc_list_start(tp->threads);
563   while ((t = silc_list_get(tp->threads))) {
564     silc_mutex_lock(t->lock);
565     if (t->run) {
566       silc_mutex_unlock(t->lock);
567       continue;
568     }
569
570     /* Signal the thread to stop */
571     t->stop = TRUE;
572     silc_cond_signal(t->thread_signal);
573     silc_mutex_unlock(t->lock);
574
575     silc_list_del(tp->free_threads, t);
576
577     i--;
578     if (!i)
579       break;
580   }
581
582   silc_list_start(tp->threads);
583   silc_mutex_unlock(tp->lock);
584 }
585
586 /*************************** Thread-local Storage ***************************/
587
588 void silc_thread_tls_set(void *context)
589 {
590   SilcTls tls = silc_thread_get_tls();
591
592   if (!tls) {
593     /* Initialize Tls for this thread */
594     tls = silc_thread_tls_init();
595     if (!tls)
596       return;
597   }
598
599   tls->thread_context = context;
600 }
601
602 void *silc_thread_tls_get(void)
603 {
604   SilcTls tls = silc_thread_get_tls();
605   if (!tls)
606     return NULL;
607   return tls->thread_context;
608 }