ad53a421692192bbcae4a6f5a529f1c5026990e9
[crypto.git] / lib / silcutil / silcthread.c
1 /*
2
3   silcthread.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2007 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silc.h"
21
22 /* Explanation of the thread pool execution.
23
24    When new call is added to thread pool by calling silc_thread_pool_run
25    it is assigned to a first free thread from the free list.  If no threads
26    are available we take one from the threads list and assign the call to
27    its queue.  The threads list always takes different thread finally wrapping
28    from the beginning.  This way each thread will get a chance to execute
29    queued calls.
30
31    The thread function silc_thread_pool_run_thread executes each call.  After
32    executing the current call that has been assigned to it, it will check
33    if there are any queued calls in its queue, and it will execute all calls
34    from the queue.  If there aren't any calls in the queue, it will attempt
35    to steal a call from some other thread and execute it.
36
37    The queue list is always consumed in last-in-first-out order.  The most
38    recently added call gets priority.  With full utilization this helps to
39    avoid CPU cache misses.  Since the queues are thread specific with full
40    utilization each thread should always be doing work for the most recent
41    (and thus most important) calls. */
42
43 /************************** Types and definitions ***************************/
44
45 /* Thread pool thread context.  Each thread contains the most current call
46    to be executed, and a list of queued calls. */
47 typedef struct SilcThreadPoolThreadStruct {
48   struct SilcThreadPoolThreadStruct *next;
49   struct SilcThreadPoolThreadStruct *next2;
50   SilcThreadPool tp;                /* The thread pool */
51   SilcCond thread_signal;           /* Condition variable for signalling */
52   SilcMutex lock;                   /* Thread lock */
53   SilcList queue;                   /* Queue for waiting calls */
54   SilcList free_queue;              /* Queue freelist */
55   SilcSchedule schedule;            /* The current Scheduler, may be NULL */
56   SilcThreadPoolFunc run;           /* The current call to run in a thread */
57   SilcTaskCallback completion;      /* The current Completion function */
58   void *run_context;
59   void *completion_context;
60   unsigned int stop        : 1;     /* Set to stop the thread */
61 } *SilcThreadPoolThread;
62
63 /* Thread pool context */
64 struct SilcThreadPoolStruct {
65   SilcStack stack;                  /* Stack for memory allocation */
66   SilcCond pool_signal;             /* Condition variable for signalling */
67   SilcMutex lock;                   /* Pool lock */
68   SilcList threads;                 /* Threads in the pool */
69   SilcList free_threads;            /* Threads freelist */
70   SilcUInt16 min_threads;           /* Minimum threads in the pool */
71   SilcUInt16 max_threads;           /* Maximum threads in the pool */
72   SilcUInt16 refcnt;                /* Reference counter */
73   unsigned int destroy       : 1;   /* Set when pool is to be destroyed */
74 };
75
76 /************************ Static utility functions **************************/
77
78 /* Reference thread pool.  Must be called locked. */
79
80 static void silc_thread_pool_ref(SilcThreadPool tp)
81 {
82   tp->refcnt++;
83   SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
84                   tp->refcnt));
85 }
86
87 /* Unreference thread pool.  Must be called locked.  Releases the lock. */
88
89 static void silc_thread_pool_unref(SilcThreadPool tp)
90 {
91   tp->refcnt--;
92   SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
93                   tp->refcnt));
94   if (!tp->refcnt) {
95     SilcStack stack = tp->stack;
96     silc_mutex_unlock(tp->lock);
97     silc_mutex_free(tp->lock);
98     silc_cond_free(tp->pool_signal);
99     silc_sfree(stack, tp);
100     silc_stack_free(stack);
101     return;
102   }
103   silc_mutex_unlock(tp->lock);
104 }
105
106 /* The thread executor.  Each thread in the pool is run here.  They wait
107    here for something to do which is given to them by silc_thread_pool_run. */
108
109 static void *silc_thread_pool_run_thread(void *context)
110 {
111   SilcThreadPoolThread t = context, o, q;
112   SilcThreadPool tp = t->tp;
113   SilcMutex lock = t->lock;
114   SilcCond thread_signal = t->thread_signal;
115
116   silc_mutex_lock(lock);
117
118   while (1) {
119     /* Wait here for code to execute */
120     while (!t->run && !t->stop)
121       silc_cond_wait(thread_signal, lock);
122
123     if (silc_unlikely(t->stop)) {
124       /* Stop the thread.  Remove from threads list. */
125       SILC_LOG_DEBUG(("Stop thread %p", t));
126       silc_mutex_lock(tp->lock);
127       silc_list_del(tp->threads, t);
128       silc_list_start(tp->threads);
129
130       /* Clear thread's call queue. */
131       silc_list_start(t->queue);
132       silc_list_start(t->free_queue);
133       while ((q = silc_list_get(t->queue)))
134         silc_sfree(tp->stack, q);
135       while ((q = silc_list_get(t->free_queue)))
136         silc_sfree(tp->stack, q);
137
138       /* Destroy the thread */
139       silc_mutex_unlock(lock);
140       silc_mutex_free(lock);
141       silc_cond_free(thread_signal);
142       silc_sfree(tp->stack, t);
143
144       /* If we are last thread, signal the waiting destructor. */
145       if (silc_list_count(tp->threads) == 0)
146         silc_cond_signal(tp->pool_signal);
147
148       /* Release pool reference.  Releases lock also. */
149       silc_thread_pool_unref(tp);
150       break;
151     }
152     silc_mutex_unlock(lock);
153
154     /* Execute code */
155   execute:
156     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
157                     t->run_context, t));
158     t->run(t->schedule, t->run_context);
159
160     /* If scheduler is NULL, call completion directly from here.  Otherwise
161        it is called through the scheduler in the thread where the scheduler
162        is running. */
163     if (t->completion) {
164       if (t->schedule) {
165         SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
166         if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
167                                             t->completion_context, 0, 0)) {
168           SILC_LOG_DEBUG(("Run completion directly"));
169           t->completion(NULL, NULL, 0, 0, t->completion_context);
170         }
171         silc_schedule_wakeup(t->schedule);
172       } else {
173         SILC_LOG_DEBUG(("Run completion directly"));
174         t->completion(NULL, NULL, 0, 0, t->completion_context);
175       }
176     }
177
178     /* Check if there are calls in queue.  Takes the most recently added
179        call since new ones are added at the start of the list. */
180     silc_mutex_lock(lock);
181     if (silc_list_count(t->queue) > 0) {
182     execute_queue:
183       silc_list_start(t->queue);
184       q = silc_list_get(t->queue);
185
186       SILC_LOG_DEBUG(("Execute call from queue"));
187
188       /* Execute this call now */
189       t->run = q->run;
190       t->run_context = q->run_context;
191       t->completion = q->completion;
192       t->completion_context = q->completion_context;
193       t->schedule = q->schedule;
194
195       silc_list_del(t->queue, q);
196       silc_list_add(t->free_queue, q);
197       silc_mutex_unlock(lock);
198       goto execute;
199     } else {
200       silc_mutex_unlock(lock);
201
202       /* Nothing to do.  Attempt to steal call from some other thread. */
203       silc_mutex_lock(tp->lock);
204       o = silc_list_get(tp->threads);
205       if (!o) {
206         /* List wraps around */
207         silc_list_start(tp->threads);
208         o = silc_list_get(tp->threads);
209       }
210       silc_mutex_unlock(tp->lock);
211
212       if (o && o != t) {
213         silc_mutex_lock(o->lock);
214         if (silc_list_count(o->queue) > 0) {
215           silc_list_start(o->queue);
216           q = silc_list_get(o->queue);
217
218           SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
219
220           /* Execute this call now */
221           t->run = q->run;
222           t->run_context = q->run_context;
223           t->completion = q->completion;
224           t->completion_context = q->completion_context;
225           t->schedule = q->schedule;
226
227           silc_list_del(o->queue, q);
228           silc_list_add(o->free_queue, q);
229           silc_mutex_unlock(o->lock);
230           goto execute;
231         }
232         silc_mutex_unlock(o->lock);
233       }
234     }
235
236     silc_mutex_lock(lock);
237
238     /* Now that we have the lock back, check the queue again. */
239     if (silc_list_count(t->queue) > 0)
240       goto execute_queue;
241
242     /* The thread is now free for use again. */
243     t->run = NULL;
244     t->completion = NULL;
245     t->schedule = NULL;
246     silc_list_add(tp->free_threads, t);
247   }
248
249   return NULL;
250 }
251
252 /* Creates new thread to thread pool */
253
254 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
255 {
256   SilcThreadPoolThread t;
257
258   t = silc_scalloc(tp->stack, 1, sizeof(*t));
259   if (!t)
260     return NULL;
261
262   if (!silc_mutex_alloc(&t->lock)) {
263     silc_sfree(tp->stack, t);
264     return NULL;
265   }
266
267   if (!silc_cond_alloc(&t->thread_signal)) {
268     silc_mutex_free(t->lock);
269     silc_sfree(tp->stack, t);
270     return NULL;
271   }
272
273   t->tp = tp;
274   silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
275   silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
276
277   /* Add to thread pool */
278   silc_list_add(tp->threads, t);
279   silc_list_add(tp->free_threads, t);
280   silc_thread_pool_ref(tp);
281
282   SILC_LOG_DEBUG(("Start thread %p", t));
283
284   /* Start the thread */
285   silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
286
287   return t;
288 }
289
290 /**************************** Thread Pool API *******************************/
291
292 /* Allocate thread pool */
293
294 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
295                                       SilcUInt32 min_threads,
296                                       SilcUInt32 max_threads,
297                                       SilcBool start_min_threads)
298 {
299   SilcThreadPool tp;
300   int i;
301
302   if (max_threads < min_threads)
303     return NULL;
304   if (!max_threads)
305     return NULL;
306
307   if (stack)
308     stack = silc_stack_alloc(0, stack);
309
310   tp = silc_scalloc(stack, 1, sizeof(*tp));
311   if (!tp) {
312     silc_stack_free(stack);
313     return NULL;
314   }
315
316   SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
317                   tp, min_threads, max_threads));
318
319   tp->stack = stack;
320   tp->min_threads = min_threads;
321   tp->max_threads = max_threads;
322   tp->refcnt++;
323
324   if (!silc_mutex_alloc(&tp->lock)) {
325     silc_sfree(stack, tp);
326     silc_stack_free(stack);
327     return NULL;
328   }
329
330   if (!silc_cond_alloc(&tp->pool_signal)) {
331     silc_mutex_free(tp->lock);
332     silc_sfree(stack, tp);
333     silc_stack_free(stack);
334     return NULL;
335   }
336
337   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
338   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
339
340   for (i = 0; i < tp->min_threads && start_min_threads; i++)
341     silc_thread_pool_new_thread(tp);
342
343   silc_list_start(tp->threads);
344
345   return tp;
346 }
347
348 /* Free thread pool */
349
350 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
351 {
352   SilcThreadPoolThread t;
353
354   SILC_LOG_DEBUG(("Free thread pool %p", tp));
355
356   silc_mutex_lock(tp->lock);
357   tp->destroy = TRUE;
358
359   /* Stop threads */
360   silc_list_start(tp->threads);
361   while ((t = silc_list_get(tp->threads))) {
362     silc_mutex_lock(t->lock);
363     t->stop = TRUE;
364     silc_cond_signal(t->thread_signal);
365     silc_mutex_unlock(t->lock);
366   }
367
368   if (wait_unfinished) {
369     SILC_LOG_DEBUG(("Wait threads to finish"));
370     while (silc_list_count(tp->threads))
371       silc_cond_wait(tp->pool_signal, tp->lock);
372   }
373
374   /* Release reference.  Releases lock also. */
375   silc_thread_pool_unref(tp);
376 }
377
378 /* Execute code in a thread in the pool */
379
380 SilcBool silc_thread_pool_run(SilcThreadPool tp,
381                               SilcBool queuable,
382                               SilcSchedule schedule,
383                               SilcThreadPoolFunc run,
384                               void *run_context,
385                               SilcTaskCallback completion,
386                               void *completion_context)
387 {
388   SilcThreadPoolThread t, q;
389
390   silc_mutex_lock(tp->lock);
391
392   if (tp->destroy) {
393     silc_mutex_unlock(tp->lock);
394     return FALSE;
395   }
396
397   /* Get free thread */
398   silc_list_start(tp->free_threads);
399   t = silc_list_get(tp->free_threads);
400   if (!t) {
401     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
402       /* Maximum threads reached */
403       if (!queuable) {
404         silc_mutex_unlock(tp->lock);
405         return FALSE;
406       }
407
408       /* User wants to queue this call until thread becomes free.  Get
409          a thread to assign this call. */
410       t = silc_list_get(tp->threads);
411       if (!t) {
412         /* List wraps around */
413         silc_list_start(tp->threads);
414         t = silc_list_get(tp->threads);
415       }
416       silc_mutex_unlock(tp->lock);
417
418       SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
419                       run, run_context, t));
420
421       silc_mutex_lock(t->lock);
422
423       /* Get free call context from the list */
424       silc_list_start(t->free_queue);
425       q = silc_list_get(t->free_queue);
426       if (!q) {
427         q = silc_scalloc(tp->stack, 1, sizeof(*q));
428         if (!q) {
429           silc_mutex_unlock(t->lock);
430           return FALSE;
431         }
432       } else {
433         silc_list_del(t->free_queue, q);
434       }
435
436       q->run = run;
437       q->run_context = run_context;
438       q->completion = completion;
439       q->completion_context = completion_context;
440       q->schedule = schedule;
441
442       /* Add at the start of the list.  It gets executed first. */
443       silc_list_insert(t->queue, NULL, q);
444       silc_mutex_unlock(t->lock);
445       return TRUE;
446     } else {
447       /* Create new thread */
448       t = silc_thread_pool_new_thread(tp);
449       if (!t) {
450         silc_mutex_unlock(tp->lock);
451         return FALSE;
452       }
453     }
454   }
455
456   silc_list_del(tp->free_threads, t);
457   silc_mutex_unlock(tp->lock);
458
459   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
460
461   silc_mutex_lock(t->lock);
462
463   /* Mark this call to be executed in this thread */
464   t->run = run;
465   t->run_context = run_context;
466   t->completion = completion;
467   t->completion_context = completion_context;
468   t->schedule = schedule;
469
470   /* Signal the thread */
471   silc_cond_signal(t->thread_signal);
472   silc_mutex_unlock(t->lock);
473
474   return TRUE;
475 }
476
477 /* Set maximum threads in the pool */
478
479 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
480                                       SilcUInt32 max_threads)
481 {
482   SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
483
484   silc_mutex_lock(tp->lock);
485   tp->max_threads = max_threads;
486   silc_mutex_unlock(tp->lock);
487 }
488
489 /* Get maximum threads in the pool */
490
491 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
492 {
493   SilcUInt32 max_threads;
494
495   silc_mutex_lock(tp->lock);
496   max_threads = tp->max_threads;
497   silc_mutex_unlock(tp->lock);
498
499   return max_threads;
500 }
501
502 /* Get numnber of free threads in the pool */
503
504 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
505 {
506   SilcUInt32 free_threads;
507
508   silc_mutex_lock(tp->lock);
509   free_threads = silc_list_count(tp->free_threads);
510   silc_mutex_unlock(tp->lock);
511
512   return free_threads;
513 }
514
515 /* Purge pool */
516
517 void silc_thread_pool_purge(SilcThreadPool tp)
518 {
519   SilcThreadPoolThread t;
520   int i;
521
522   silc_mutex_lock(tp->lock);
523
524   if (silc_list_count(tp->free_threads) <= tp->min_threads) {
525     SILC_LOG_DEBUG(("No threads to purge"));
526     silc_mutex_unlock(tp->lock);
527     return;
528   }
529
530   i = silc_list_count(tp->free_threads) - tp->min_threads;
531
532   SILC_LOG_DEBUG(("Purge %d threads", i));
533
534   silc_list_start(tp->threads);
535   while ((t = silc_list_get(tp->threads))) {
536     silc_mutex_lock(t->lock);
537     if (t->run) {
538       silc_mutex_unlock(t->lock);
539       continue;
540     }
541
542     /* Signal the thread to stop */
543     t->stop = TRUE;
544     silc_cond_signal(t->thread_signal);
545     silc_mutex_unlock(t->lock);
546
547     silc_list_del(tp->free_threads, t);
548
549     i--;
550     if (!i)
551       break;
552   }
553
554   silc_list_start(tp->threads);
555   silc_mutex_unlock(tp->lock);
556 }