Changed thread pool completion callback to SilcTaskCallback so that
[silc.git] / lib / silcutil / silcthread.c
1 /*
2
3   silcthread.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2007 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silc.h"
21
22 /************************** Types and definitions ***************************/
23
24 /* Thread pool thread context */
25 typedef struct SilcThreadPoolThreadStruct {
26   struct SilcThreadPoolThreadStruct *next;
27   struct SilcThreadPoolThreadStruct *next2;
28   SilcThreadPool tp;                /* The thread pool */
29   SilcSchedule schedule;            /* Scheduler, may be NULL */
30   SilcThreadPoolFunc run;           /* The function to run in a thread */
31   SilcTaskCallback completion;      /* Completion function */
32   void *run_context;
33   void *completion_context;
34   unsigned int stop        : 1;     /* Set to stop the thread */
35 } *SilcThreadPoolThread;
36
37 /* Thread pool context */
38 struct SilcThreadPoolStruct {
39   SilcStack stack;                  /* Stack for memory allocation */
40   SilcMutex lock;                   /* Pool lock */
41   SilcCond pool_signal;             /* Condition variable for signalling */
42   SilcList threads;                 /* Threads in the pool */
43   SilcList free_threads;            /* Threads freelist */
44   SilcList queue;                   /* Queue for waiting calls */
45   SilcUInt16 min_threads;           /* Minimum threads in the pool */
46   SilcUInt16 max_threads;           /* Maximum threads in the pool */
47   SilcUInt16 refcnt;                /* Reference counter */
48   unsigned int destroy       : 1;   /* Set when pool is to be destroyed */
49 };
50
51 /************************ Static utility functions **************************/
52
53 /* Reference thread pool.  Must be called locked. */
54
55 static void silc_thread_pool_ref(SilcThreadPool tp)
56 {
57   tp->refcnt++;
58   SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
59                   tp->refcnt));
60 }
61
62 /* Unreference thread pool.  Must be called locked.  Releases the lock. */
63
64 static void silc_thread_pool_unref(SilcThreadPool tp)
65 {
66   tp->refcnt--;
67   SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
68                   tp->refcnt));
69   if (!tp->refcnt) {
70     SilcStack stack = tp->stack;
71     silc_mutex_unlock(tp->lock);
72     silc_mutex_free(tp->lock);
73     silc_cond_free(tp->pool_signal);
74     silc_sfree(stack, tp);
75     silc_stack_free(stack);
76     return;
77   }
78   silc_mutex_unlock(tp->lock);
79 }
80
81 /* The thread executor.  Each thread in the pool is run here.  They wait
82    here for something to do which is given to them by silc_thread_pool_run. */
83
84 static void *silc_thread_pool_run_thread(void *context)
85 {
86   SilcThreadPoolThread t = context, q;
87   SilcThreadPool tp = t->tp;
88   SilcMutex lock = tp->lock;
89   SilcCond pool_signal = tp->pool_signal;
90
91   silc_mutex_lock(lock);
92
93   while (1) {
94     /* Wait here for code to execute */
95     while (!t->run && !t->stop)
96       silc_cond_wait(pool_signal, lock);
97
98     if (t->stop) {
99       /* Stop the thread.  Remove from threads list and free memory. */
100       SILC_LOG_DEBUG(("Stop thread %p", t));
101       silc_list_del(tp->threads, t);
102       silc_sfree(tp->stack, t);
103
104       /* If we are last thread, signal the waiting destructor. */
105       if (silc_list_count(tp->threads) == 0)
106         silc_cond_broadcast(pool_signal);
107
108       /* Release pool reference.  Releases lock also. */
109       silc_thread_pool_unref(tp);
110       break;
111     }
112     silc_mutex_unlock(lock);
113
114     /* Execute code */
115     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
116                     t->run_context, t));
117     t->run(t->schedule, t->run_context);
118
119     /* If scheduler is NULL, call completion directly from here.  Otherwise
120        it is called through the scheduler in the thread where the scheduler
121        is running. */
122     if (t->completion) {
123       if (t->schedule) {
124         SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
125         if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
126                                             t->completion_context, 0, 0)) {
127           SILC_LOG_DEBUG(("Run completion directly"));
128           t->completion(NULL, NULL, 0, 0, t->completion_context);
129         }
130         silc_schedule_wakeup(t->schedule);
131       } else {
132         SILC_LOG_DEBUG(("Run completion directly"));
133         t->completion(NULL, NULL, 0, 0, t->completion_context);
134       }
135     }
136
137     silc_mutex_lock(lock);
138
139     /* Check if there are calls in queue */
140     if (silc_list_count(tp->queue) > 0) {
141       silc_list_start(tp->queue);
142       q = silc_list_get(tp->queue);
143
144       SILC_LOG_DEBUG(("Execute call from queue"));
145
146       /* Execute this call now */
147       t->run = q->run;
148       t->run_context = q->run_context;
149       t->completion = q->completion;
150       t->completion_context = q->completion_context;
151       t->schedule = q->schedule;
152
153       silc_list_del(tp->queue, q);
154       silc_sfree(tp->stack, q);
155       continue;
156     }
157
158     /* The thread is now free for use again. */
159     t->run = NULL;
160     t->completion = NULL;
161     t->schedule = NULL;
162     silc_list_add(tp->free_threads, t);
163   }
164
165   return NULL;
166 }
167
168 /* Creates new thread to thread pool */
169
170 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
171 {
172   SilcThreadPoolThread t;
173
174   t = silc_scalloc(tp->stack, 1, sizeof(*t));
175   if (!t)
176     return NULL;
177   t->tp = tp;
178   silc_list_add(tp->threads, t);
179   silc_list_add(tp->free_threads, t);
180   silc_thread_pool_ref(tp);
181
182   SILC_LOG_DEBUG(("Start thread %p", t));
183
184   /* Start the thread */
185   silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
186
187   return t;
188 }
189
190 /**************************** Thread Pool API *******************************/
191
192 /* Allocate thread pool */
193
194 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
195                                       SilcUInt32 min_threads,
196                                       SilcUInt32 max_threads,
197                                       SilcBool start_min_threads)
198 {
199   SilcThreadPool tp;
200   int i;
201
202   if (max_threads < min_threads)
203     return NULL;
204
205   if (stack)
206     stack = silc_stack_alloc(0, stack);
207
208   tp = silc_scalloc(stack, 1, sizeof(*tp));
209   if (!tp) {
210     silc_stack_free(stack);
211     return NULL;
212   }
213
214   SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
215                   tp, min_threads, max_threads));
216
217   tp->stack = stack;
218   tp->min_threads = min_threads;
219   tp->max_threads = max_threads;
220   tp->refcnt++;
221
222   if (!silc_mutex_alloc(&tp->lock)) {
223     silc_sfree(stack, tp);
224     silc_stack_free(stack);
225     return NULL;
226   }
227
228   if (!silc_cond_alloc(&tp->pool_signal)) {
229     silc_mutex_free(tp->lock);
230     silc_sfree(stack, tp);
231     silc_stack_free(stack);
232     return NULL;
233   }
234
235   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
236   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
237   silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
238
239   for (i = 0; i < tp->min_threads && start_min_threads; i++)
240     silc_thread_pool_new_thread(tp);
241
242   return tp;
243 }
244
245 /* Free thread pool */
246
247 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
248 {
249   SilcThreadPoolThread t;
250
251   SILC_LOG_DEBUG(("Free thread pool %p", tp));
252
253   silc_mutex_lock(tp->lock);
254   tp->destroy = TRUE;
255
256   /* Stop threads */
257   silc_list_start(tp->threads);
258   while ((t = silc_list_get(tp->threads)))
259     t->stop = TRUE;
260   silc_cond_broadcast(tp->pool_signal);
261
262   if (wait_unfinished) {
263     SILC_LOG_DEBUG(("Wait threads to finish"));
264     while (silc_list_count(tp->threads))
265       silc_cond_wait(tp->pool_signal, tp->lock);
266   }
267
268   /* Free calls from queue */
269   silc_list_start(tp->queue);
270   while ((t = silc_list_get(tp->queue)))
271     silc_sfree(tp->stack, t);
272   silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
273
274   /* Release reference.  Releases lock also. */
275   silc_thread_pool_unref(tp);
276 }
277
278 /* Execute code in a thread in the pool */
279
280 SilcBool silc_thread_pool_run(SilcThreadPool tp,
281                               SilcBool queuable,
282                               SilcSchedule schedule,
283                               SilcThreadPoolFunc run,
284                               void *run_context,
285                               SilcTaskCallback completion,
286                               void *completion_context)
287 {
288   SilcThreadPoolThread t;
289
290   silc_mutex_lock(tp->lock);
291
292   if (tp->destroy) {
293     silc_mutex_unlock(tp->lock);
294     return FALSE;
295   }
296
297   /* Get free thread */
298   silc_list_start(tp->free_threads);
299   t = silc_list_get(tp->free_threads);
300   if (!t) {
301     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
302       /* Maximum threads reached */
303       if (!queuable) {
304         silc_mutex_unlock(tp->lock);
305         return FALSE;
306       }
307
308       SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
309
310       /* User wants to queue this call until thread becomes free */
311       t = silc_scalloc(tp->stack, 1, sizeof(*t));
312       if (!t) {
313         silc_mutex_unlock(tp->lock);
314         return FALSE;
315       }
316
317       t->run = run;
318       t->run_context = run_context;
319       t->completion = completion;
320       t->completion_context = completion_context;
321       t->schedule = schedule;
322
323       silc_list_add(tp->queue, t);
324       silc_mutex_unlock(tp->lock);
325       return TRUE;
326     } else {
327       /* Create new thread */
328       t = silc_thread_pool_new_thread(tp);
329       if (!t) {
330         silc_mutex_unlock(tp->lock);
331         return FALSE;
332       }
333     }
334   }
335
336   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
337
338   /* Mark this call to be executed in this thread */
339   t->run = run;
340   t->run_context = run_context;
341   t->completion = completion;
342   t->completion_context = completion_context;
343   t->schedule = schedule;
344   silc_list_del(tp->free_threads, t);
345
346   /* Signal threads */
347   silc_cond_broadcast(tp->pool_signal);
348
349   silc_mutex_unlock(tp->lock);
350   return TRUE;
351 }
352
353 /* Set maximum threads in the pool */
354
355 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
356                                       SilcUInt32 max_threads)
357 {
358   SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
359
360   silc_mutex_lock(tp->lock);
361   tp->max_threads = max_threads;
362   silc_mutex_unlock(tp->lock);
363 }
364
365 /* Get maximum threads in the pool */
366
367 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
368 {
369   SilcUInt32 max_threads;
370
371   silc_mutex_lock(tp->lock);
372   max_threads = tp->max_threads;
373   silc_mutex_unlock(tp->lock);
374
375   return max_threads;
376 }
377
378 /* Get numnber of free threads in the pool */
379
380 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
381 {
382   SilcUInt32 free_threads;
383
384   silc_mutex_lock(tp->lock);
385   free_threads = silc_list_count(tp->free_threads);
386   silc_mutex_unlock(tp->lock);
387
388   return free_threads;
389 }
390
391 /* Purge pool */
392
393 void silc_thread_pool_purge(SilcThreadPool tp)
394 {
395   SilcThreadPoolThread t;
396   int i;
397
398   silc_mutex_lock(tp->lock);
399
400   if (silc_list_count(tp->free_threads) <= tp->min_threads) {
401     SILC_LOG_DEBUG(("No threads to purge"));
402     silc_mutex_unlock(tp->lock);
403     return;
404   }
405
406   i = silc_list_count(tp->free_threads) - tp->min_threads;
407
408   SILC_LOG_DEBUG(("Purge %d threads", i));
409
410   silc_list_start(tp->threads);
411   while ((t = silc_list_get(tp->threads))) {
412     if (t->run)
413       continue;
414
415     t->stop = TRUE;
416     silc_list_del(tp->free_threads, t);
417
418     i--;
419     if (!i)
420       break;
421   }
422
423   /* Signal threads to stop */
424   silc_cond_broadcast(tp->pool_signal);
425
426   silc_mutex_unlock(tp->lock);
427 }