5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2007 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
22 /************************** Types and definitions ***************************/
24 /* Thread pool thread context */
25 typedef struct SilcThreadPoolThreadStruct {
26 struct SilcThreadPoolThreadStruct *next;
27 struct SilcThreadPoolThreadStruct *next2;
28 SilcThreadPool tp; /* The thread pool */
29 SilcSchedule schedule; /* Scheduler, may be NULL */
30 SilcThreadPoolFunc run; /* The function to run in a thread */
31 SilcTaskCallback completion; /* Completion function */
33 void *completion_context;
34 unsigned int stop : 1; /* Set to stop the thread */
35 } *SilcThreadPoolThread;
37 /* Thread pool context */
38 struct SilcThreadPoolStruct {
39 SilcStack stack; /* Stack for memory allocation */
40 SilcMutex lock; /* Pool lock */
41 SilcCond pool_signal; /* Condition variable for signalling */
42 SilcList threads; /* Threads in the pool */
43 SilcList free_threads; /* Threads freelist */
44 SilcList queue; /* Queue for waiting calls */
45 SilcList free_queue; /* Queue freelist */
46 SilcUInt16 min_threads; /* Minimum threads in the pool */
47 SilcUInt16 max_threads; /* Maximum threads in the pool */
48 SilcUInt16 refcnt; /* Reference counter */
49 unsigned int destroy : 1; /* Set when pool is to be destroyed */
52 /************************ Static utility functions **************************/
54 /* Reference thread pool. Must be called locked. */
56 static void silc_thread_pool_ref(SilcThreadPool tp)
59 SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
63 /* Unreference thread pool. Must be called locked. Releases the lock. */
65 static void silc_thread_pool_unref(SilcThreadPool tp)
68 SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
71 SilcStack stack = tp->stack;
72 silc_mutex_unlock(tp->lock);
73 silc_mutex_free(tp->lock);
74 silc_cond_free(tp->pool_signal);
75 silc_sfree(stack, tp);
76 silc_stack_free(stack);
79 silc_mutex_unlock(tp->lock);
82 /* The thread executor. Each thread in the pool is run here. They wait
83 here for something to do which is given to them by silc_thread_pool_run. */
85 static void *silc_thread_pool_run_thread(void *context)
87 SilcThreadPoolThread t = context, q;
88 SilcThreadPool tp = t->tp;
89 SilcMutex lock = tp->lock;
90 SilcCond pool_signal = tp->pool_signal;
92 silc_mutex_lock(lock);
95 /* Wait here for code to execute */
96 while (!t->run && !t->stop)
97 silc_cond_wait(pool_signal, lock);
100 /* Stop the thread. Remove from threads list and free memory. */
101 SILC_LOG_DEBUG(("Stop thread %p", t));
102 silc_list_del(tp->threads, t);
103 silc_sfree(tp->stack, t);
105 /* If we are last thread, signal the waiting destructor. */
106 if (silc_list_count(tp->threads) == 0)
107 silc_cond_broadcast(pool_signal);
109 /* Release pool reference. Releases lock also. */
110 silc_thread_pool_unref(tp);
113 silc_mutex_unlock(lock);
116 SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
118 t->run(t->schedule, t->run_context);
120 /* If scheduler is NULL, call completion directly from here. Otherwise
121 it is called through the scheduler in the thread where the scheduler
125 SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
126 if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
127 t->completion_context, 0, 0)) {
128 SILC_LOG_DEBUG(("Run completion directly"));
129 t->completion(NULL, NULL, 0, 0, t->completion_context);
131 silc_schedule_wakeup(t->schedule);
133 SILC_LOG_DEBUG(("Run completion directly"));
134 t->completion(NULL, NULL, 0, 0, t->completion_context);
138 silc_mutex_lock(lock);
140 /* Check if there are calls in queue */
141 if (silc_list_count(tp->queue) > 0) {
142 silc_list_start(tp->queue);
143 q = silc_list_get(tp->queue);
145 SILC_LOG_DEBUG(("Execute call from queue"));
147 /* Execute this call now */
149 t->run_context = q->run_context;
150 t->completion = q->completion;
151 t->completion_context = q->completion_context;
152 t->schedule = q->schedule;
154 silc_list_del(tp->queue, q);
155 silc_list_add(tp->free_queue, q);
159 /* The thread is now free for use again. */
161 t->completion = NULL;
163 silc_list_add(tp->free_threads, t);
169 /* Creates new thread to thread pool */
171 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
173 SilcThreadPoolThread t;
175 t = silc_scalloc(tp->stack, 1, sizeof(*t));
179 silc_list_add(tp->threads, t);
180 silc_list_add(tp->free_threads, t);
181 silc_thread_pool_ref(tp);
183 SILC_LOG_DEBUG(("Start thread %p", t));
185 /* Start the thread */
186 silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
191 /**************************** Thread Pool API *******************************/
193 /* Allocate thread pool */
195 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
196 SilcUInt32 min_threads,
197 SilcUInt32 max_threads,
198 SilcBool start_min_threads)
203 if (max_threads < min_threads)
207 stack = silc_stack_alloc(0, stack);
209 tp = silc_scalloc(stack, 1, sizeof(*tp));
211 silc_stack_free(stack);
215 SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
216 tp, min_threads, max_threads));
219 tp->min_threads = min_threads;
220 tp->max_threads = max_threads;
223 if (!silc_mutex_alloc(&tp->lock)) {
224 silc_sfree(stack, tp);
225 silc_stack_free(stack);
229 if (!silc_cond_alloc(&tp->pool_signal)) {
230 silc_mutex_free(tp->lock);
231 silc_sfree(stack, tp);
232 silc_stack_free(stack);
236 silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
237 silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
238 silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
239 silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
241 for (i = 0; i < tp->min_threads && start_min_threads; i++)
242 silc_thread_pool_new_thread(tp);
247 /* Free thread pool */
249 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
251 SilcThreadPoolThread t;
253 SILC_LOG_DEBUG(("Free thread pool %p", tp));
255 silc_mutex_lock(tp->lock);
259 silc_list_start(tp->threads);
260 while ((t = silc_list_get(tp->threads)))
262 silc_cond_broadcast(tp->pool_signal);
264 if (wait_unfinished) {
265 SILC_LOG_DEBUG(("Wait threads to finish"));
266 while (silc_list_count(tp->threads))
267 silc_cond_wait(tp->pool_signal, tp->lock);
270 /* Free calls from queue */
271 silc_list_start(tp->queue);
272 while ((t = silc_list_get(tp->queue)))
273 silc_sfree(tp->stack, t);
274 silc_list_start(tp->free_queue);
275 while ((t = silc_list_get(tp->free_queue)))
276 silc_sfree(tp->stack, t);
277 silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
278 silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
280 /* Release reference. Releases lock also. */
281 silc_thread_pool_unref(tp);
284 /* Execute code in a thread in the pool */
286 SilcBool silc_thread_pool_run(SilcThreadPool tp,
288 SilcSchedule schedule,
289 SilcThreadPoolFunc run,
291 SilcTaskCallback completion,
292 void *completion_context)
294 SilcThreadPoolThread t;
296 silc_mutex_lock(tp->lock);
299 silc_mutex_unlock(tp->lock);
303 /* Get free thread */
304 silc_list_start(tp->free_threads);
305 t = silc_list_get(tp->free_threads);
307 if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
308 /* Maximum threads reached */
310 silc_mutex_unlock(tp->lock);
314 SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
316 /* User wants to queue this call until thread becomes free */
317 silc_list_start(tp->free_queue);
318 t = silc_list_get(tp->free_queue);
320 t = silc_scalloc(tp->stack, 1, sizeof(*t));
322 silc_mutex_unlock(tp->lock);
326 silc_list_del(tp->free_queue, t);
330 t->run_context = run_context;
331 t->completion = completion;
332 t->completion_context = completion_context;
333 t->schedule = schedule;
335 silc_list_add(tp->queue, t);
336 silc_mutex_unlock(tp->lock);
339 /* Create new thread */
340 t = silc_thread_pool_new_thread(tp);
342 silc_mutex_unlock(tp->lock);
348 SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
350 /* Mark this call to be executed in this thread */
352 t->run_context = run_context;
353 t->completion = completion;
354 t->completion_context = completion_context;
355 t->schedule = schedule;
356 silc_list_del(tp->free_threads, t);
359 silc_cond_broadcast(tp->pool_signal);
361 silc_mutex_unlock(tp->lock);
365 /* Set maximum threads in the pool */
367 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
368 SilcUInt32 max_threads)
370 SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
372 silc_mutex_lock(tp->lock);
373 tp->max_threads = max_threads;
374 silc_mutex_unlock(tp->lock);
377 /* Get maximum threads in the pool */
379 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
381 SilcUInt32 max_threads;
383 silc_mutex_lock(tp->lock);
384 max_threads = tp->max_threads;
385 silc_mutex_unlock(tp->lock);
390 /* Get numnber of free threads in the pool */
392 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
394 SilcUInt32 free_threads;
396 silc_mutex_lock(tp->lock);
397 free_threads = silc_list_count(tp->free_threads);
398 silc_mutex_unlock(tp->lock);
405 void silc_thread_pool_purge(SilcThreadPool tp)
407 SilcThreadPoolThread t;
410 silc_mutex_lock(tp->lock);
412 if (silc_list_count(tp->free_threads) <= tp->min_threads) {
413 SILC_LOG_DEBUG(("No threads to purge"));
414 silc_mutex_unlock(tp->lock);
418 i = silc_list_count(tp->free_threads) - tp->min_threads;
420 SILC_LOG_DEBUG(("Purge %d threads", i));
422 silc_list_start(tp->threads);
423 while ((t = silc_list_get(tp->threads))) {
428 silc_list_del(tp->free_threads, t);
435 /* Signal threads to stop */
436 silc_cond_broadcast(tp->pool_signal);
438 silc_mutex_unlock(tp->lock);