5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2007 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
22 /************************** Types and definitions ***************************/
24 /* Thread pool thread context */
25 typedef struct SilcThreadPoolThreadStruct {
26 struct SilcThreadPoolThreadStruct *next;
27 struct SilcThreadPoolThreadStruct *next2;
28 SilcThreadPool tp; /* The thread pool */
29 SilcSchedule schedule; /* Scheduler, may be NULL */
30 SilcThreadPoolFunc run; /* The function to run in a thread */
31 SilcTaskCallback completion; /* Completion function */
33 void *completion_context;
34 unsigned int stop : 1; /* Set to stop the thread */
35 } *SilcThreadPoolThread;
37 /* Thread pool context */
38 struct SilcThreadPoolStruct {
39 SilcStack stack; /* Stack for memory allocation */
40 SilcMutex lock; /* Pool lock */
41 SilcCond pool_signal; /* Condition variable for signalling */
42 SilcList threads; /* Threads in the pool */
43 SilcList free_threads; /* Threads freelist */
44 SilcList queue; /* Queue for waiting calls */
45 SilcUInt16 min_threads; /* Minimum threads in the pool */
46 SilcUInt16 max_threads; /* Maximum threads in the pool */
47 SilcUInt16 refcnt; /* Reference counter */
48 unsigned int destroy : 1; /* Set when pool is to be destroyed */
51 /************************ Static utility functions **************************/
53 /* Reference thread pool. Must be called locked. */
55 static void silc_thread_pool_ref(SilcThreadPool tp)
58 SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
62 /* Unreference thread pool. Must be called locked. Releases the lock. */
64 static void silc_thread_pool_unref(SilcThreadPool tp)
67 SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
70 SilcStack stack = tp->stack;
71 silc_mutex_unlock(tp->lock);
72 silc_mutex_free(tp->lock);
73 silc_cond_free(tp->pool_signal);
74 silc_sfree(stack, tp);
75 silc_stack_free(stack);
78 silc_mutex_unlock(tp->lock);
81 /* The thread executor. Each thread in the pool is run here. They wait
82 here for something to do which is given to them by silc_thread_pool_run. */
84 static void *silc_thread_pool_run_thread(void *context)
86 SilcThreadPoolThread t = context, q;
87 SilcThreadPool tp = t->tp;
88 SilcMutex lock = tp->lock;
89 SilcCond pool_signal = tp->pool_signal;
91 silc_mutex_lock(lock);
94 /* Wait here for code to execute */
95 while (!t->run && !t->stop)
96 silc_cond_wait(pool_signal, lock);
99 /* Stop the thread. Remove from threads list and free memory. */
100 SILC_LOG_DEBUG(("Stop thread %p", t));
101 silc_list_del(tp->threads, t);
102 silc_sfree(tp->stack, t);
104 /* If we are last thread, signal the waiting destructor. */
105 if (silc_list_count(tp->threads) == 0)
106 silc_cond_broadcast(pool_signal);
108 /* Release pool reference. Releases lock also. */
109 silc_thread_pool_unref(tp);
112 silc_mutex_unlock(lock);
115 SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
117 t->run(t->schedule, t->run_context);
119 /* If scheduler is NULL, call completion directly from here. Otherwise
120 it is called through the scheduler in the thread where the scheduler
124 SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
125 if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
126 t->completion_context, 0, 0)) {
127 SILC_LOG_DEBUG(("Run completion directly"));
128 t->completion(NULL, NULL, 0, 0, t->completion_context);
130 silc_schedule_wakeup(t->schedule);
132 SILC_LOG_DEBUG(("Run completion directly"));
133 t->completion(NULL, NULL, 0, 0, t->completion_context);
137 silc_mutex_lock(lock);
139 /* Check if there are calls in queue */
140 if (silc_list_count(tp->queue) > 0) {
141 silc_list_start(tp->queue);
142 q = silc_list_get(tp->queue);
144 SILC_LOG_DEBUG(("Execute call from queue"));
146 /* Execute this call now */
148 t->run_context = q->run_context;
149 t->completion = q->completion;
150 t->completion_context = q->completion_context;
151 t->schedule = q->schedule;
153 silc_list_del(tp->queue, q);
154 silc_sfree(tp->stack, q);
158 /* The thread is now free for use again. */
160 t->completion = NULL;
162 silc_list_add(tp->free_threads, t);
168 /* Creates new thread to thread pool */
170 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
172 SilcThreadPoolThread t;
174 t = silc_scalloc(tp->stack, 1, sizeof(*t));
178 silc_list_add(tp->threads, t);
179 silc_list_add(tp->free_threads, t);
180 silc_thread_pool_ref(tp);
182 SILC_LOG_DEBUG(("Start thread %p", t));
184 /* Start the thread */
185 silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
190 /**************************** Thread Pool API *******************************/
192 /* Allocate thread pool */
194 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
195 SilcUInt32 min_threads,
196 SilcUInt32 max_threads,
197 SilcBool start_min_threads)
202 if (max_threads < min_threads)
206 stack = silc_stack_alloc(0, stack);
208 tp = silc_scalloc(stack, 1, sizeof(*tp));
210 silc_stack_free(stack);
214 SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
215 tp, min_threads, max_threads));
218 tp->min_threads = min_threads;
219 tp->max_threads = max_threads;
222 if (!silc_mutex_alloc(&tp->lock)) {
223 silc_sfree(stack, tp);
224 silc_stack_free(stack);
228 if (!silc_cond_alloc(&tp->pool_signal)) {
229 silc_mutex_free(tp->lock);
230 silc_sfree(stack, tp);
231 silc_stack_free(stack);
235 silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
236 silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
237 silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
239 for (i = 0; i < tp->min_threads && start_min_threads; i++)
240 silc_thread_pool_new_thread(tp);
245 /* Free thread pool */
247 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
249 SilcThreadPoolThread t;
251 SILC_LOG_DEBUG(("Free thread pool %p", tp));
253 silc_mutex_lock(tp->lock);
257 silc_list_start(tp->threads);
258 while ((t = silc_list_get(tp->threads)))
260 silc_cond_broadcast(tp->pool_signal);
262 if (wait_unfinished) {
263 SILC_LOG_DEBUG(("Wait threads to finish"));
264 while (silc_list_count(tp->threads))
265 silc_cond_wait(tp->pool_signal, tp->lock);
268 /* Free calls from queue */
269 silc_list_start(tp->queue);
270 while ((t = silc_list_get(tp->queue)))
271 silc_sfree(tp->stack, t);
272 silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
274 /* Release reference. Releases lock also. */
275 silc_thread_pool_unref(tp);
278 /* Execute code in a thread in the pool */
280 SilcBool silc_thread_pool_run(SilcThreadPool tp,
282 SilcSchedule schedule,
283 SilcThreadPoolFunc run,
285 SilcTaskCallback completion,
286 void *completion_context)
288 SilcThreadPoolThread t;
290 silc_mutex_lock(tp->lock);
293 silc_mutex_unlock(tp->lock);
297 /* Get free thread */
298 silc_list_start(tp->free_threads);
299 t = silc_list_get(tp->free_threads);
301 if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
302 /* Maximum threads reached */
304 silc_mutex_unlock(tp->lock);
308 SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
310 /* User wants to queue this call until thread becomes free */
311 t = silc_scalloc(tp->stack, 1, sizeof(*t));
313 silc_mutex_unlock(tp->lock);
318 t->run_context = run_context;
319 t->completion = completion;
320 t->completion_context = completion_context;
321 t->schedule = schedule;
323 silc_list_add(tp->queue, t);
324 silc_mutex_unlock(tp->lock);
327 /* Create new thread */
328 t = silc_thread_pool_new_thread(tp);
330 silc_mutex_unlock(tp->lock);
336 SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
338 /* Mark this call to be executed in this thread */
340 t->run_context = run_context;
341 t->completion = completion;
342 t->completion_context = completion_context;
343 t->schedule = schedule;
344 silc_list_del(tp->free_threads, t);
347 silc_cond_broadcast(tp->pool_signal);
349 silc_mutex_unlock(tp->lock);
353 /* Set maximum threads in the pool */
355 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
356 SilcUInt32 max_threads)
358 SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
360 silc_mutex_lock(tp->lock);
361 tp->max_threads = max_threads;
362 silc_mutex_unlock(tp->lock);
365 /* Get maximum threads in the pool */
367 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
369 SilcUInt32 max_threads;
371 silc_mutex_lock(tp->lock);
372 max_threads = tp->max_threads;
373 silc_mutex_unlock(tp->lock);
378 /* Get numnber of free threads in the pool */
380 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
382 SilcUInt32 free_threads;
384 silc_mutex_lock(tp->lock);
385 free_threads = silc_list_count(tp->free_threads);
386 silc_mutex_unlock(tp->lock);
393 void silc_thread_pool_purge(SilcThreadPool tp)
395 SilcThreadPoolThread t;
398 silc_mutex_lock(tp->lock);
400 if (silc_list_count(tp->free_threads) <= tp->min_threads) {
401 SILC_LOG_DEBUG(("No threads to purge"));
402 silc_mutex_unlock(tp->lock);
406 i = silc_list_count(tp->free_threads) - tp->min_threads;
408 SILC_LOG_DEBUG(("Purge %d threads", i));
410 silc_list_start(tp->threads);
411 while ((t = silc_list_get(tp->threads))) {
416 silc_list_del(tp->free_threads, t);
423 /* Signal threads to stop */
424 silc_cond_broadcast(tp->pool_signal);
426 silc_mutex_unlock(tp->lock);