num_max_thread. -> get_max_threads.
[silc.git] / lib / silcutil / silcthread.c
1 /*
2
3   silcthread.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2007 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silc.h"
21
22 /************************** Types and definitions ***************************/
23
24 /* Thread pool thread context */
25 typedef struct SilcThreadPoolThreadStruct {
26   struct SilcThreadPoolThreadStruct *next;
27   struct SilcThreadPoolThreadStruct *next2;
28   SilcThreadPool tp;                /* The thread pool */
29   SilcSchedule schedule;            /* Scheduler, may be NULL */
30   SilcThreadPoolFunc run;           /* The function to run in a thread */
31   SilcThreadPoolFunc completion;    /* Completion function */
32   void *run_context;
33   void *completion_context;
34   unsigned int stop        : 1;     /* Set to stop the thread */
35 } *SilcThreadPoolThread;
36
37 /* Completion context */
38 typedef struct SilcThreadPoolCompletionStruct {
39   SilcSchedule schedule;            /* Scheduler, may be NULL */
40   SilcThreadPoolFunc completion;    /* Completion function */
41   void *completion_context;
42 } *SilcThreadPoolCompletion;
43
44 /* Thread pool context */
45 struct SilcThreadPoolStruct {
46   SilcStack stack;                  /* Stack for memory allocation */
47   SilcMutex lock;                   /* Pool lock */
48   SilcCond pool_signal;             /* Condition variable for signalling */
49   SilcList threads;                 /* Threads in the pool */
50   SilcList free_threads;            /* Threads freelist */
51   SilcList queue;                   /* Queue for waiting calls */
52   SilcUInt16 min_threads;           /* Minimum threads in the pool */
53   SilcUInt16 max_threads;           /* Maximum threads in the pool */
54   SilcUInt16 refcnt;                /* Reference counter */
55   unsigned int destroy       : 1;   /* Set when pool is to be destroyed */
56 };
57
58 /************************ Static utility functions **************************/
59
60 /* Reference thread pool.  Must be called locked. */
61
62 static void silc_thread_pool_ref(SilcThreadPool tp)
63 {
64   tp->refcnt++;
65   SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
66                   tp->refcnt));
67 }
68
69 /* Unreference thread pool.  Must be called locked.  Releases the lock. */
70
71 static void silc_thread_pool_unref(SilcThreadPool tp)
72 {
73   tp->refcnt--;
74   SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
75                   tp->refcnt));
76   if (!tp->refcnt) {
77     silc_mutex_unlock(tp->lock);
78     silc_mutex_free(tp->lock);
79     silc_cond_free(tp->pool_signal);
80     silc_sfree(tp->stack, tp);
81     return;
82   }
83   silc_mutex_unlock(tp->lock);
84 }
85
86 /* Thread completion callback */
87
88 SILC_TASK_CALLBACK(silc_thread_pool_run_completion)
89 {
90   SilcThreadPoolCompletion c = context;
91   c->completion(c->schedule, c->completion_context);
92   silc_free(c);
93 }
94
95 /* The thread executor.  Each thread in the pool is run here.  They wait
96    here for something to do which is given to them by silc_thread_pool_run. */
97
98 static void *silc_thread_pool_run_thread(void *context)
99 {
100   SilcThreadPoolThread t = context, q;
101   SilcThreadPool tp = t->tp;
102   SilcMutex lock = tp->lock;
103   SilcCond pool_signal = tp->pool_signal;
104
105   silc_mutex_lock(lock);
106
107   while (1) {
108     /* Wait here for code to execute */
109     while (!t->run && !t->stop)
110       silc_cond_wait(pool_signal, lock);
111
112     if (t->stop) {
113       /* Stop the thread.  Remove from threads list and free memory. */
114       SILC_LOG_DEBUG(("Stop thread %p", t));
115       silc_list_del(tp->threads, t);
116       silc_sfree(tp->stack, t);
117
118       /* If we are last thread, signal the waiting destructor. */
119       if (silc_list_count(tp->threads) == 0)
120         silc_cond_broadcast(pool_signal);
121
122       /* Release pool reference.  Releases lock also. */
123       silc_thread_pool_unref(tp);
124       break;
125     }
126     silc_mutex_unlock(lock);
127
128     /* Execute code */
129     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
130                     t->run_context, t));
131     t->run(t->schedule, t->run_context);
132
133     /* If scheduler is NULL, call completion directly from here.  Otherwise
134        it is called through the scheduler in the thread where the scheduler
135        is running. */
136     if (t->completion) {
137       if (t->schedule) {
138         SilcThreadPoolCompletion c = silc_calloc(1, sizeof(*c));
139         if (c) {
140           SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
141           c->schedule = t->schedule;
142           c->completion = t->completion;
143           c->completion_context = t->completion_context;
144           silc_schedule_task_add_timeout(c->schedule,
145                                          silc_thread_pool_run_completion, c,
146                                          0, 0);
147           silc_schedule_wakeup(c->schedule);
148         } else {
149           t->completion(NULL, t->completion_context);
150         }
151       } else {
152         SILC_LOG_DEBUG(("Run completion directly"));
153         t->completion(NULL, t->completion_context);
154       }
155     }
156
157     silc_mutex_lock(lock);
158
159     /* Check if there are calls in queue */
160     if (silc_list_count(tp->queue) > 0) {
161       silc_list_start(tp->queue);
162       q = silc_list_get(tp->queue);
163
164       SILC_LOG_DEBUG(("Execute call from queue"));
165
166       /* Execute this call now */
167       t->run = q->run;
168       t->run_context = q->run_context;
169       t->completion = q->completion;
170       t->completion_context = q->completion_context;
171       t->schedule = q->schedule;
172
173       silc_list_del(tp->queue, q);
174       silc_sfree(tp->stack, q);
175       continue;
176     }
177
178     /* The thread is now free for use again. */
179     t->run = NULL;
180     t->completion = NULL;
181     t->schedule = NULL;
182     silc_list_add(tp->free_threads, t);
183   }
184
185   return NULL;
186 }
187
188 /* Creates new thread to thread pool */
189
190 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
191 {
192   SilcThreadPoolThread t;
193
194   t = silc_scalloc(tp->stack, 1, sizeof(*t));
195   if (!t)
196     return NULL;
197   t->tp = tp;
198   silc_list_add(tp->threads, t);
199   silc_list_add(tp->free_threads, t);
200   silc_thread_pool_ref(tp);
201
202   SILC_LOG_DEBUG(("Start thread %p", t));
203
204   /* Start the thread */
205   silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
206
207   return t;
208 }
209
210 /**************************** Thread Pool API *******************************/
211
212 /* Allocate thread pool */
213
214 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
215                                       SilcUInt32 min_threads,
216                                       SilcUInt32 max_threads,
217                                       SilcBool start_min_threads)
218 {
219   SilcThreadPool tp;
220   int i;
221
222   if (max_threads < min_threads)
223     return NULL;
224
225   tp = silc_scalloc(stack, 1, sizeof(*tp));
226   if (!tp)
227     return NULL;
228
229   SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
230                   tp, min_threads, max_threads));
231
232   tp->stack = stack;
233   tp->min_threads = min_threads;
234   tp->max_threads = max_threads;
235   tp->refcnt++;
236
237   if (!silc_mutex_alloc(&tp->lock)) {
238     silc_sfree(stack, tp);
239     return NULL;
240   }
241
242   if (!silc_cond_alloc(&tp->pool_signal)) {
243     silc_mutex_free(tp->lock);
244     silc_sfree(stack, tp);
245     return NULL;
246   }
247
248   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
249   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
250   silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
251
252   for (i = 0; i < tp->min_threads && start_min_threads; i++)
253     silc_thread_pool_new_thread(tp);
254
255   return tp;
256 }
257
258 /* Free thread pool */
259
260 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
261 {
262   SilcThreadPoolThread t;
263
264   SILC_LOG_DEBUG(("Free thread pool %p", tp));
265
266   silc_mutex_lock(tp->lock);
267   tp->destroy = TRUE;
268
269   /* Stop threads */
270   silc_list_start(tp->threads);
271   while ((t = silc_list_get(tp->threads)))
272     t->stop = TRUE;
273   silc_cond_broadcast(tp->pool_signal);
274
275   if (wait_unfinished) {
276     SILC_LOG_DEBUG(("Wait threads to finish"));
277     while (silc_list_count(tp->threads))
278       silc_cond_wait(tp->pool_signal, tp->lock);
279   }
280
281   /* Free calls from queue */
282   silc_list_start(tp->queue);
283   while ((t = silc_list_get(tp->queue)))
284     silc_sfree(tp->stack, t);
285   silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
286
287   /* Release reference.  Releases lock also. */
288   silc_thread_pool_unref(tp);
289 }
290
291 /* Execute code in a thread in the pool */
292
293 SilcBool silc_thread_pool_run(SilcThreadPool tp,
294                               SilcBool queuable,
295                               SilcSchedule schedule,
296                               SilcThreadPoolFunc run,
297                               void *run_context,
298                               SilcThreadPoolFunc completion,
299                               void *completion_context)
300 {
301   SilcThreadPoolThread t;
302
303   silc_mutex_lock(tp->lock);
304
305   if (tp->destroy) {
306     silc_mutex_unlock(tp->lock);
307     return FALSE;
308   }
309
310   /* Get free thread */
311   silc_list_start(tp->free_threads);
312   t = silc_list_get(tp->free_threads);
313   if (!t) {
314     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
315       /* Maximum threads reached */
316       if (!queuable) {
317         silc_mutex_unlock(tp->lock);
318         return FALSE;
319       }
320
321       SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
322
323       /* User wants to queue this call until thread becomes free */
324       t = silc_scalloc(tp->stack, 1, sizeof(*t));
325       if (!t) {
326         silc_mutex_unlock(tp->lock);
327         return FALSE;
328       }
329
330       t->run = run;
331       t->run_context = run_context;
332       t->completion = completion;
333       t->completion_context = completion_context;
334       t->schedule = schedule;
335
336       silc_list_add(tp->queue, t);
337       silc_mutex_unlock(tp->lock);
338       return TRUE;
339     } else {
340       /* Create new thread */
341       t = silc_thread_pool_new_thread(tp);
342       if (!t) {
343         silc_mutex_unlock(tp->lock);
344         return FALSE;
345       }
346     }
347   }
348
349   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
350
351   /* Mark this call to be executed in this thread */
352   t->run = run;
353   t->run_context = run_context;
354   t->completion = completion;
355   t->completion_context = completion_context;
356   t->schedule = schedule;
357   silc_list_del(tp->free_threads, t);
358
359   /* Signal threads */
360   silc_cond_broadcast(tp->pool_signal);
361
362   silc_mutex_unlock(tp->lock);
363   return TRUE;
364 }
365
366 /* Set maximum threads in the pool */
367
368 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
369                                       SilcUInt32 max_threads)
370 {
371   SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
372
373   silc_mutex_lock(tp->lock);
374   tp->max_threads = max_threads;
375   silc_mutex_unlock(tp->lock);
376 }
377
378 /* Get maximum threads in the pool */
379
380 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
381 {
382   SilcUInt32 max_threads;
383
384   silc_mutex_lock(tp->lock);
385   max_threads = tp->max_threads;
386   silc_mutex_unlock(tp->lock);
387
388   return max_threads;
389 }
390
391 /* Get numnber of free threads in the pool */
392
393 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
394 {
395   SilcUInt32 free_threads;
396
397   silc_mutex_lock(tp->lock);
398   free_threads = silc_list_count(tp->free_threads);
399   silc_mutex_unlock(tp->lock);
400
401   return free_threads;
402 }
403
404 /* Purge pool */
405
406 void silc_thread_pool_purge(SilcThreadPool tp)
407 {
408   SilcThreadPoolThread t;
409   int i;
410
411   silc_mutex_lock(tp->lock);
412
413   if (silc_list_count(tp->free_threads) <= tp->min_threads) {
414     SILC_LOG_DEBUG(("No threads to purge"));
415     silc_mutex_unlock(tp->lock);
416     return;
417   }
418
419   i = silc_list_count(tp->free_threads) - tp->min_threads;
420
421   SILC_LOG_DEBUG(("Purge %d threads", i));
422
423   silc_list_start(tp->threads);
424   while ((t = silc_list_get(tp->threads))) {
425     if (t->run)
426       continue;
427
428     t->stop = TRUE;
429     silc_list_del(tp->free_threads, t);
430
431     i--;
432     if (!i)
433       break;
434   }
435
436   /* Signal threads to stop */
437   silc_cond_broadcast(tp->pool_signal);
438
439   silc_mutex_unlock(tp->lock);
440 }