5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2007 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
22 /************************** Types and definitions ***************************/
24 /* Thread pool thread context */
25 typedef struct SilcThreadPoolThreadStruct {
26 struct SilcThreadPoolThreadStruct *next;
27 struct SilcThreadPoolThreadStruct *next2;
28 SilcThreadPool tp; /* The thread pool */
29 SilcSchedule schedule; /* Scheduler, may be NULL */
30 SilcThreadPoolFunc run; /* The function to run in a thread */
31 SilcThreadPoolFunc completion; /* Completion function */
33 void *completion_context;
34 unsigned int stop : 1; /* Set to stop the thread */
35 } *SilcThreadPoolThread;
37 /* Completion context */
38 typedef struct SilcThreadPoolCompletionStruct {
39 SilcSchedule schedule; /* Scheduler, may be NULL */
40 SilcThreadPoolFunc completion; /* Completion function */
41 void *completion_context;
42 } *SilcThreadPoolCompletion;
44 /* Thread pool context */
45 struct SilcThreadPoolStruct {
46 SilcStack stack; /* Stack for memory allocation */
47 SilcMutex lock; /* Pool lock */
48 SilcCond pool_signal; /* Condition variable for signalling */
49 SilcList threads; /* Threads in the pool */
50 SilcList free_threads; /* Threads freelist */
51 SilcList queue; /* Queue for waiting calls */
52 SilcUInt16 min_threads; /* Minimum threads in the pool */
53 SilcUInt16 max_threads; /* Maximum threads in the pool */
54 SilcUInt16 refcnt; /* Reference counter */
55 unsigned int destroy : 1; /* Set when pool is to be destroyed */
58 /************************ Static utility functions **************************/
60 /* Reference thread pool. Must be called locked. */
62 static void silc_thread_pool_ref(SilcThreadPool tp)
65 SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
69 /* Unreference thread pool. Must be called locked. Releases the lock. */
71 static void silc_thread_pool_unref(SilcThreadPool tp)
74 SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
77 silc_mutex_unlock(tp->lock);
78 silc_mutex_free(tp->lock);
79 silc_cond_free(tp->pool_signal);
83 silc_mutex_unlock(tp->lock);
86 /* Thread completion callback */
88 SILC_TASK_CALLBACK(silc_thread_pool_run_completion)
90 SilcThreadPoolCompletion c = context;
91 c->completion(c->schedule, c->completion_context);
95 /* The thread executor. Each thread in the pool is run here. They wait
96 here for something to do which is given to them by silc_thread_pool_run. */
98 static void *silc_thread_pool_run_thread(void *context)
100 SilcThreadPoolThread t = context, q;
101 SilcThreadPool tp = t->tp;
102 SilcMutex lock = tp->lock;
103 SilcCond pool_signal = tp->pool_signal;
105 silc_mutex_lock(lock);
108 /* Wait here for code to execute */
109 while (!t->run && !t->stop)
110 silc_cond_wait(pool_signal, lock);
113 /* Stop the thread. Remove from threads list and free memory. */
114 SILC_LOG_DEBUG(("Stop thread %p", t));
115 silc_list_del(tp->threads, t);
118 /* If we are last thread, signal the waiting destructor. */
119 if (silc_list_count(tp->threads) == 0)
120 silc_cond_signal(pool_signal);
122 /* Release pool reference. Releases lock also. */
123 silc_thread_pool_unref(tp);
126 silc_mutex_unlock(lock);
129 SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
131 t->run(t->schedule, t->run_context);
133 /* If scheduler is NULL, call completion directly from here. Otherwise
134 it is called through the scheduler in the thread where the scheduler
138 SilcThreadPoolCompletion c = silc_calloc(1, sizeof(*c));
140 SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
141 c->schedule = t->schedule;
142 c->completion = t->completion;
143 c->completion_context = t->completion_context;
144 silc_schedule_task_add_timeout(c->schedule,
145 silc_thread_pool_run_completion, c,
147 silc_schedule_wakeup(c->schedule);
149 t->completion(NULL, t->completion_context);
152 SILC_LOG_DEBUG(("Run completion directly"));
153 t->completion(NULL, t->completion_context);
157 silc_mutex_lock(lock);
159 /* Check if there are calls in queue */
160 if (silc_list_count(tp->queue) > 0) {
161 silc_list_start(tp->queue);
162 q = silc_list_get(tp->queue);
164 SILC_LOG_DEBUG(("Execute call from queue"));
166 /* Execute this call now */
168 t->run_context = q->run_context;
169 t->completion = q->completion;
170 t->completion_context = q->completion_context;
171 t->schedule = q->schedule;
173 silc_list_del(tp->queue, q);
178 /* The thread is now free for use again. */
180 t->completion = NULL;
182 silc_list_add(tp->free_threads, t);
188 /* Creates new thread to thread pool */
190 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
192 SilcThreadPoolThread t;
194 t = silc_calloc(1, sizeof(*t));
198 silc_list_add(tp->threads, t);
199 silc_list_add(tp->free_threads, t);
200 silc_thread_pool_ref(tp);
202 SILC_LOG_DEBUG(("Start thread %p", t));
204 /* Start the thread */
205 silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
210 /**************************** Thread Pool API *******************************/
212 /* Allocate thread pool */
214 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
215 SilcUInt32 min_threads,
216 SilcUInt32 max_threads,
217 SilcBool start_min_threads)
222 if (max_threads < min_threads)
225 tp = silc_calloc(1, sizeof(*tp));
229 SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
230 tp, min_threads, max_threads));
233 tp->min_threads = min_threads;
234 tp->max_threads = max_threads;
237 if (!silc_mutex_alloc(&tp->lock)) {
242 if (!silc_cond_alloc(&tp->pool_signal)) {
243 silc_mutex_free(tp->lock);
248 silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
249 silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
250 silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
252 for (i = 0; i < tp->min_threads && start_min_threads; i++)
253 silc_thread_pool_new_thread(tp);
258 /* Free thread pool */
260 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
262 SilcThreadPoolThread t;
264 SILC_LOG_DEBUG(("Free thread pool %p", tp));
266 silc_mutex_lock(tp->lock);
270 silc_list_start(tp->threads);
271 while ((t = silc_list_get(tp->threads)))
273 silc_cond_signal(tp->pool_signal);
275 if (wait_unfinished) {
276 SILC_LOG_DEBUG(("Wait threads to finish"));
277 while (silc_list_count(tp->threads))
278 silc_cond_wait(tp->pool_signal, tp->lock);
281 /* Free calls from queue */
282 silc_list_start(tp->queue);
283 while ((t = silc_list_get(tp->queue)))
285 silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
287 /* Release reference. Releases lock also. */
288 silc_thread_pool_unref(tp);
291 /* Execute code in a thread in the pool */
293 SilcBool silc_thread_pool_run(SilcThreadPool tp,
295 SilcSchedule schedule,
296 SilcThreadPoolFunc run,
298 SilcThreadPoolFunc completion,
299 void *completion_context)
301 SilcThreadPoolThread t;
303 silc_mutex_lock(tp->lock);
306 silc_mutex_unlock(tp->lock);
310 /* Get free thread */
311 silc_list_start(tp->free_threads);
312 t = silc_list_get(tp->free_threads);
314 if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
315 /* Maximum threads reached */
317 silc_mutex_unlock(tp->lock);
321 SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
323 /* User wants to queue this call until thread becomes free */
324 t = silc_calloc(1, sizeof(*t));
326 silc_mutex_unlock(tp->lock);
331 t->run_context = run_context;
332 t->completion = completion;
333 t->completion_context = completion_context;
334 t->schedule = schedule;
336 silc_list_add(tp->queue, t);
337 silc_mutex_unlock(tp->lock);
340 /* Create new thread */
341 t = silc_thread_pool_new_thread(tp);
343 silc_mutex_unlock(tp->lock);
349 SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
351 /* Mark this call to be executed in this thread */
353 t->run_context = run_context;
354 t->completion = completion;
355 t->completion_context = completion_context;
356 t->schedule = schedule;
357 silc_list_del(tp->free_threads, t);
360 silc_cond_signal(tp->pool_signal);
362 silc_mutex_unlock(tp->lock);
366 /* Set maximum threads in the pool */
368 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
369 SilcUInt32 max_threads)
371 SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
373 silc_mutex_lock(tp->lock);
374 tp->max_threads = max_threads;
375 silc_mutex_unlock(tp->lock);
378 /* Get maximum threads in the pool */
380 SilcUInt32 silc_thread_pool_num_max_threads(SilcThreadPool tp)
382 SilcUInt32 max_threads;
384 silc_mutex_lock(tp->lock);
385 max_threads = tp->max_threads;
386 silc_mutex_unlock(tp->lock);
391 /* Get numnber of free threads in the pool */
393 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
395 SilcUInt32 free_threads;
397 silc_mutex_lock(tp->lock);
398 free_threads = silc_list_count(tp->free_threads);
399 silc_mutex_unlock(tp->lock);
406 void silc_thread_pool_purge(SilcThreadPool tp)
408 SilcThreadPoolThread t;
411 silc_mutex_lock(tp->lock);
413 if (silc_list_count(tp->free_threads) <= tp->min_threads) {
414 silc_mutex_unlock(tp->lock);
418 i = silc_list_count(tp->free_threads) - tp->min_threads;
420 SILC_LOG_DEBUG(("Purge %d threads", i));
422 silc_list_start(tp->threads);
423 while ((t = silc_list_get(tp->threads))) {
428 silc_list_del(tp->free_threads, t);
435 /* Signal threads to stop */
436 silc_cond_signal(tp->pool_signal);
438 silc_mutex_unlock(tp->lock);