Added free list for queued run entries, intead of allocating
[silc.git] / lib / silcutil / silcthread.c
1 /*
2
3   silcthread.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2007 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silc.h"
21
22 /************************** Types and definitions ***************************/
23
24 /* Thread pool thread context */
25 typedef struct SilcThreadPoolThreadStruct {
26   struct SilcThreadPoolThreadStruct *next;
27   struct SilcThreadPoolThreadStruct *next2;
28   SilcThreadPool tp;                /* The thread pool */
29   SilcSchedule schedule;            /* Scheduler, may be NULL */
30   SilcThreadPoolFunc run;           /* The function to run in a thread */
31   SilcTaskCallback completion;      /* Completion function */
32   void *run_context;
33   void *completion_context;
34   unsigned int stop        : 1;     /* Set to stop the thread */
35 } *SilcThreadPoolThread;
36
37 /* Thread pool context */
38 struct SilcThreadPoolStruct {
39   SilcStack stack;                  /* Stack for memory allocation */
40   SilcMutex lock;                   /* Pool lock */
41   SilcCond pool_signal;             /* Condition variable for signalling */
42   SilcList threads;                 /* Threads in the pool */
43   SilcList free_threads;            /* Threads freelist */
44   SilcList queue;                   /* Queue for waiting calls */
45   SilcList free_queue;              /* Queue freelist */
46   SilcUInt16 min_threads;           /* Minimum threads in the pool */
47   SilcUInt16 max_threads;           /* Maximum threads in the pool */
48   SilcUInt16 refcnt;                /* Reference counter */
49   unsigned int destroy       : 1;   /* Set when pool is to be destroyed */
50 };
51
52 /************************ Static utility functions **************************/
53
54 /* Reference thread pool.  Must be called locked. */
55
56 static void silc_thread_pool_ref(SilcThreadPool tp)
57 {
58   tp->refcnt++;
59   SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
60                   tp->refcnt));
61 }
62
63 /* Unreference thread pool.  Must be called locked.  Releases the lock. */
64
65 static void silc_thread_pool_unref(SilcThreadPool tp)
66 {
67   tp->refcnt--;
68   SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
69                   tp->refcnt));
70   if (!tp->refcnt) {
71     SilcStack stack = tp->stack;
72     silc_mutex_unlock(tp->lock);
73     silc_mutex_free(tp->lock);
74     silc_cond_free(tp->pool_signal);
75     silc_sfree(stack, tp);
76     silc_stack_free(stack);
77     return;
78   }
79   silc_mutex_unlock(tp->lock);
80 }
81
82 /* The thread executor.  Each thread in the pool is run here.  They wait
83    here for something to do which is given to them by silc_thread_pool_run. */
84
85 static void *silc_thread_pool_run_thread(void *context)
86 {
87   SilcThreadPoolThread t = context, q;
88   SilcThreadPool tp = t->tp;
89   SilcMutex lock = tp->lock;
90   SilcCond pool_signal = tp->pool_signal;
91
92   silc_mutex_lock(lock);
93
94   while (1) {
95     /* Wait here for code to execute */
96     while (!t->run && !t->stop)
97       silc_cond_wait(pool_signal, lock);
98
99     if (t->stop) {
100       /* Stop the thread.  Remove from threads list and free memory. */
101       SILC_LOG_DEBUG(("Stop thread %p", t));
102       silc_list_del(tp->threads, t);
103       silc_sfree(tp->stack, t);
104
105       /* If we are last thread, signal the waiting destructor. */
106       if (silc_list_count(tp->threads) == 0)
107         silc_cond_broadcast(pool_signal);
108
109       /* Release pool reference.  Releases lock also. */
110       silc_thread_pool_unref(tp);
111       break;
112     }
113     silc_mutex_unlock(lock);
114
115     /* Execute code */
116     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
117                     t->run_context, t));
118     t->run(t->schedule, t->run_context);
119
120     /* If scheduler is NULL, call completion directly from here.  Otherwise
121        it is called through the scheduler in the thread where the scheduler
122        is running. */
123     if (t->completion) {
124       if (t->schedule) {
125         SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
126         if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
127                                             t->completion_context, 0, 0)) {
128           SILC_LOG_DEBUG(("Run completion directly"));
129           t->completion(NULL, NULL, 0, 0, t->completion_context);
130         }
131         silc_schedule_wakeup(t->schedule);
132       } else {
133         SILC_LOG_DEBUG(("Run completion directly"));
134         t->completion(NULL, NULL, 0, 0, t->completion_context);
135       }
136     }
137
138     silc_mutex_lock(lock);
139
140     /* Check if there are calls in queue */
141     if (silc_list_count(tp->queue) > 0) {
142       silc_list_start(tp->queue);
143       q = silc_list_get(tp->queue);
144
145       SILC_LOG_DEBUG(("Execute call from queue"));
146
147       /* Execute this call now */
148       t->run = q->run;
149       t->run_context = q->run_context;
150       t->completion = q->completion;
151       t->completion_context = q->completion_context;
152       t->schedule = q->schedule;
153
154       silc_list_del(tp->queue, q);
155       silc_list_add(tp->free_queue, q);
156       continue;
157     }
158
159     /* The thread is now free for use again. */
160     t->run = NULL;
161     t->completion = NULL;
162     t->schedule = NULL;
163     silc_list_add(tp->free_threads, t);
164   }
165
166   return NULL;
167 }
168
169 /* Creates new thread to thread pool */
170
171 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
172 {
173   SilcThreadPoolThread t;
174
175   t = silc_scalloc(tp->stack, 1, sizeof(*t));
176   if (!t)
177     return NULL;
178   t->tp = tp;
179   silc_list_add(tp->threads, t);
180   silc_list_add(tp->free_threads, t);
181   silc_thread_pool_ref(tp);
182
183   SILC_LOG_DEBUG(("Start thread %p", t));
184
185   /* Start the thread */
186   silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
187
188   return t;
189 }
190
191 /**************************** Thread Pool API *******************************/
192
193 /* Allocate thread pool */
194
195 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
196                                       SilcUInt32 min_threads,
197                                       SilcUInt32 max_threads,
198                                       SilcBool start_min_threads)
199 {
200   SilcThreadPool tp;
201   int i;
202
203   if (max_threads < min_threads)
204     return NULL;
205
206   if (stack)
207     stack = silc_stack_alloc(0, stack);
208
209   tp = silc_scalloc(stack, 1, sizeof(*tp));
210   if (!tp) {
211     silc_stack_free(stack);
212     return NULL;
213   }
214
215   SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
216                   tp, min_threads, max_threads));
217
218   tp->stack = stack;
219   tp->min_threads = min_threads;
220   tp->max_threads = max_threads;
221   tp->refcnt++;
222
223   if (!silc_mutex_alloc(&tp->lock)) {
224     silc_sfree(stack, tp);
225     silc_stack_free(stack);
226     return NULL;
227   }
228
229   if (!silc_cond_alloc(&tp->pool_signal)) {
230     silc_mutex_free(tp->lock);
231     silc_sfree(stack, tp);
232     silc_stack_free(stack);
233     return NULL;
234   }
235
236   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
237   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
238   silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
239   silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
240
241   for (i = 0; i < tp->min_threads && start_min_threads; i++)
242     silc_thread_pool_new_thread(tp);
243
244   return tp;
245 }
246
247 /* Free thread pool */
248
249 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
250 {
251   SilcThreadPoolThread t;
252
253   SILC_LOG_DEBUG(("Free thread pool %p", tp));
254
255   silc_mutex_lock(tp->lock);
256   tp->destroy = TRUE;
257
258   /* Stop threads */
259   silc_list_start(tp->threads);
260   while ((t = silc_list_get(tp->threads)))
261     t->stop = TRUE;
262   silc_cond_broadcast(tp->pool_signal);
263
264   if (wait_unfinished) {
265     SILC_LOG_DEBUG(("Wait threads to finish"));
266     while (silc_list_count(tp->threads))
267       silc_cond_wait(tp->pool_signal, tp->lock);
268   }
269
270   /* Free calls from queue */
271   silc_list_start(tp->queue);
272   while ((t = silc_list_get(tp->queue)))
273     silc_sfree(tp->stack, t);
274   silc_list_start(tp->free_queue);
275   while ((t = silc_list_get(tp->free_queue)))
276     silc_sfree(tp->stack, t);
277   silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
278   silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
279
280   /* Release reference.  Releases lock also. */
281   silc_thread_pool_unref(tp);
282 }
283
284 /* Execute code in a thread in the pool */
285
286 SilcBool silc_thread_pool_run(SilcThreadPool tp,
287                               SilcBool queuable,
288                               SilcSchedule schedule,
289                               SilcThreadPoolFunc run,
290                               void *run_context,
291                               SilcTaskCallback completion,
292                               void *completion_context)
293 {
294   SilcThreadPoolThread t;
295
296   silc_mutex_lock(tp->lock);
297
298   if (tp->destroy) {
299     silc_mutex_unlock(tp->lock);
300     return FALSE;
301   }
302
303   /* Get free thread */
304   silc_list_start(tp->free_threads);
305   t = silc_list_get(tp->free_threads);
306   if (!t) {
307     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
308       /* Maximum threads reached */
309       if (!queuable) {
310         silc_mutex_unlock(tp->lock);
311         return FALSE;
312       }
313
314       SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
315
316       /* User wants to queue this call until thread becomes free */
317       silc_list_start(tp->free_queue);
318       t = silc_list_get(tp->free_queue);
319       if (!t) {
320         t = silc_scalloc(tp->stack, 1, sizeof(*t));
321         if (!t) {
322           silc_mutex_unlock(tp->lock);
323           return FALSE;
324         }
325       } else {
326         silc_list_del(tp->free_queue, t);
327       }
328
329       t->run = run;
330       t->run_context = run_context;
331       t->completion = completion;
332       t->completion_context = completion_context;
333       t->schedule = schedule;
334
335       silc_list_add(tp->queue, t);
336       silc_mutex_unlock(tp->lock);
337       return TRUE;
338     } else {
339       /* Create new thread */
340       t = silc_thread_pool_new_thread(tp);
341       if (!t) {
342         silc_mutex_unlock(tp->lock);
343         return FALSE;
344       }
345     }
346   }
347
348   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
349
350   /* Mark this call to be executed in this thread */
351   t->run = run;
352   t->run_context = run_context;
353   t->completion = completion;
354   t->completion_context = completion_context;
355   t->schedule = schedule;
356   silc_list_del(tp->free_threads, t);
357
358   /* Signal threads */
359   silc_cond_broadcast(tp->pool_signal);
360
361   silc_mutex_unlock(tp->lock);
362   return TRUE;
363 }
364
365 /* Set maximum threads in the pool */
366
367 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
368                                       SilcUInt32 max_threads)
369 {
370   SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
371
372   silc_mutex_lock(tp->lock);
373   tp->max_threads = max_threads;
374   silc_mutex_unlock(tp->lock);
375 }
376
377 /* Get maximum threads in the pool */
378
379 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
380 {
381   SilcUInt32 max_threads;
382
383   silc_mutex_lock(tp->lock);
384   max_threads = tp->max_threads;
385   silc_mutex_unlock(tp->lock);
386
387   return max_threads;
388 }
389
390 /* Get numnber of free threads in the pool */
391
392 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
393 {
394   SilcUInt32 free_threads;
395
396   silc_mutex_lock(tp->lock);
397   free_threads = silc_list_count(tp->free_threads);
398   silc_mutex_unlock(tp->lock);
399
400   return free_threads;
401 }
402
403 /* Purge pool */
404
405 void silc_thread_pool_purge(SilcThreadPool tp)
406 {
407   SilcThreadPoolThread t;
408   int i;
409
410   silc_mutex_lock(tp->lock);
411
412   if (silc_list_count(tp->free_threads) <= tp->min_threads) {
413     SILC_LOG_DEBUG(("No threads to purge"));
414     silc_mutex_unlock(tp->lock);
415     return;
416   }
417
418   i = silc_list_count(tp->free_threads) - tp->min_threads;
419
420   SILC_LOG_DEBUG(("Purge %d threads", i));
421
422   silc_list_start(tp->threads);
423   while ((t = silc_list_get(tp->threads))) {
424     if (t->run)
425       continue;
426
427     t->stop = TRUE;
428     silc_list_del(tp->free_threads, t);
429
430     i--;
431     if (!i)
432       break;
433   }
434
435   /* Signal threads to stop */
436   silc_cond_broadcast(tp->pool_signal);
437
438   silc_mutex_unlock(tp->lock);
439 }