5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2007 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
22 /* Explanation of the thread pool execution.
24 When new call is added to thread pool by calling silc_thread_pool_run
25 it is assigned to a first free thread from the free list. If no threads
26 are available we take one from the threads list and assign the call to
27 its queue. The threads list always takes different thread finally wrapping
28 from the beginning. This way each thread will get a chance to execute
31 The thread function silc_thread_pool_run_thread executes each call. After
32 executing the current call that has been assigned to it, it will check
33 if there are any queued calls in its queue, and it will execute all calls
34 from the queue. If there aren't any calls in the queue, it will attempt
35 to steal a call from some other thread and execute it.
37 The queue list is always consumed in last-in-first-out order. The most
38 recently added call gets priority. With full utilization this helps to
39 avoid CPU cache misses. Since the queues are thread specific with full
40 utilization each thread should always be doing work for the most recent
41 (and thus most important) calls. */
43 /************************** Types and definitions ***************************/
45 /* Thread pool thread context. Each thread contains the most current call
46 to be executed, and a list of queued calls. */
47 typedef struct SilcThreadPoolThreadStruct {
48 struct SilcThreadPoolThreadStruct *next;
49 struct SilcThreadPoolThreadStruct *next2;
50 SilcThreadPool tp; /* The thread pool */
51 SilcCond thread_signal; /* Condition variable for signalling */
52 SilcMutex lock; /* Thread lock */
53 SilcList queue; /* Queue for waiting calls */
54 SilcList free_queue; /* Queue freelist */
55 SilcSchedule schedule; /* The current Scheduler, may be NULL */
56 SilcThreadPoolFunc run; /* The current call to run in a thread */
57 SilcTaskCallback completion; /* The current Completion function */
59 void *completion_context;
60 unsigned int stop : 1; /* Set to stop the thread */
61 } *SilcThreadPoolThread;
63 /* Thread pool context */
64 struct SilcThreadPoolStruct {
65 SilcStack stack; /* Stack for memory allocation */
66 SilcCond pool_signal; /* Condition variable for signalling */
67 SilcMutex lock; /* Pool lock */
68 SilcList threads; /* Threads in the pool */
69 SilcList free_threads; /* Threads freelist */
70 SilcUInt16 min_threads; /* Minimum threads in the pool */
71 SilcUInt16 max_threads; /* Maximum threads in the pool */
72 SilcUInt16 refcnt; /* Reference counter */
73 unsigned int destroy : 1; /* Set when pool is to be destroyed */
76 /************************ Static utility functions **************************/
78 /* Reference thread pool. Must be called locked. */
80 static void silc_thread_pool_ref(SilcThreadPool tp)
83 SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
87 /* Unreference thread pool. Must be called locked. Releases the lock. */
89 static void silc_thread_pool_unref(SilcThreadPool tp)
92 SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
95 SilcStack stack = tp->stack;
96 silc_mutex_unlock(tp->lock);
97 silc_mutex_free(tp->lock);
98 silc_cond_free(tp->pool_signal);
99 silc_sfree(stack, tp);
100 silc_stack_free(stack);
103 silc_mutex_unlock(tp->lock);
106 /* The thread executor. Each thread in the pool is run here. They wait
107 here for something to do which is given to them by silc_thread_pool_run. */
109 static void *silc_thread_pool_run_thread(void *context)
111 SilcThreadPoolThread t = context, o, q;
112 SilcThreadPool tp = t->tp;
113 SilcMutex lock = t->lock;
114 SilcCond thread_signal = t->thread_signal;
116 silc_mutex_lock(lock);
119 /* Wait here for code to execute */
120 while (!t->run && !t->stop)
121 silc_cond_wait(thread_signal, lock);
127 silc_mutex_unlock(lock);
129 SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
131 t->run(t->schedule, t->run_context);
133 /* If scheduler is NULL, call completion directly from here. Otherwise
134 it is called through the scheduler in the thread where the scheduler
138 SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
139 if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
140 t->completion_context, 0, 0)) {
141 SILC_LOG_DEBUG(("Run completion directly"));
142 t->completion(NULL, NULL, 0, 0, t->completion_context);
144 silc_schedule_wakeup(t->schedule);
146 SILC_LOG_DEBUG(("Run completion directly"));
147 t->completion(NULL, NULL, 0, 0, t->completion_context);
151 silc_mutex_lock(lock);
155 /* Check if there are calls in queue. Takes the most recently added
156 call since new ones are added at the start of the list. */
157 if (silc_list_count(t->queue) > 0) {
159 silc_list_start(t->queue);
160 q = silc_list_get(t->queue);
162 SILC_LOG_DEBUG(("Execute call from queue"));
164 /* Execute this call now */
166 t->run_context = q->run_context;
167 t->completion = q->completion;
168 t->completion_context = q->completion_context;
169 t->schedule = q->schedule;
171 silc_list_del(t->queue, q);
172 silc_list_add(t->free_queue, q);
173 silc_mutex_unlock(lock);
177 silc_mutex_unlock(lock);
178 silc_mutex_lock(tp->lock);
180 /* Nothing to do. Attempt to steal call from some other thread. */
181 o = silc_list_get(tp->threads);
183 /* List wraps around */
184 silc_list_start(tp->threads);
185 o = silc_list_get(tp->threads);
188 /* Check that the other thread is valid and has something to execute. */
189 silc_mutex_lock(o->lock);
190 if (o == t || o->stop || silc_list_count(o->queue) == 0) {
191 silc_mutex_unlock(o->lock);
196 silc_mutex_unlock(tp->lock);
197 silc_list_start(o->queue);
198 q = silc_list_get(o->queue);
200 SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
202 /* Execute this call now */
204 t->run_context = q->run_context;
205 t->completion = q->completion;
206 t->completion_context = q->completion_context;
207 t->schedule = q->schedule;
209 silc_list_del(o->queue, q);
210 silc_list_add(o->free_queue, q);
211 silc_mutex_unlock(o->lock);
215 silc_mutex_lock(lock);
217 silc_mutex_unlock(tp->lock);
221 /* Now that we have the lock back, check the queue again. */
222 if (silc_list_count(t->queue) > 0) {
223 silc_mutex_unlock(tp->lock);
227 /* The thread is now free for use again. */
229 t->completion = NULL;
231 silc_list_add(tp->free_threads, t);
232 silc_mutex_unlock(tp->lock);
236 /* Stop the thread. Remove from threads list. */
237 SILC_LOG_DEBUG(("Stop thread %p", t));
239 /* We can unlock the thread now. After we get the thread pool lock
240 no one can retrieve the thread anymore. */
241 silc_mutex_unlock(lock);
242 silc_mutex_lock(tp->lock);
244 silc_list_del(tp->threads, t);
245 silc_list_start(tp->threads);
247 /* Clear thread's call queue. */
248 silc_list_start(t->queue);
249 silc_list_start(t->free_queue);
250 while ((q = silc_list_get(t->queue)))
251 silc_sfree(tp->stack, q);
252 while ((q = silc_list_get(t->free_queue)))
253 silc_sfree(tp->stack, q);
255 /* Destroy the thread */
256 silc_mutex_free(lock);
257 silc_cond_free(thread_signal);
258 silc_sfree(tp->stack, t);
260 /* If we are last thread, signal the waiting destructor. */
261 if (silc_list_count(tp->threads) == 0)
262 silc_cond_signal(tp->pool_signal);
264 /* Release pool reference. Releases lock also. */
265 silc_thread_pool_unref(tp);
270 /* Creates new thread to thread pool */
272 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
274 SilcThreadPoolThread t;
276 t = silc_scalloc(tp->stack, 1, sizeof(*t));
280 if (!silc_mutex_alloc(&t->lock)) {
281 silc_sfree(tp->stack, t);
285 if (!silc_cond_alloc(&t->thread_signal)) {
286 silc_mutex_free(t->lock);
287 silc_sfree(tp->stack, t);
292 silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
293 silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
295 /* Add to thread pool */
296 silc_list_add(tp->threads, t);
297 silc_list_add(tp->free_threads, t);
298 silc_thread_pool_ref(tp);
300 SILC_LOG_DEBUG(("Start thread %p", t));
302 /* Start the thread */
303 silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
308 /**************************** Thread Pool API *******************************/
310 /* Allocate thread pool */
312 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
313 SilcUInt32 min_threads,
314 SilcUInt32 max_threads,
315 SilcBool start_min_threads)
320 if (max_threads < min_threads)
326 stack = silc_stack_alloc(0, stack);
328 tp = silc_scalloc(stack, 1, sizeof(*tp));
330 silc_stack_free(stack);
334 SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
335 tp, min_threads, max_threads));
338 tp->min_threads = min_threads;
339 tp->max_threads = max_threads;
342 if (!silc_mutex_alloc(&tp->lock)) {
343 silc_sfree(stack, tp);
344 silc_stack_free(stack);
348 if (!silc_cond_alloc(&tp->pool_signal)) {
349 silc_mutex_free(tp->lock);
350 silc_sfree(stack, tp);
351 silc_stack_free(stack);
355 silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
356 silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
358 for (i = 0; i < tp->min_threads && start_min_threads; i++)
359 silc_thread_pool_new_thread(tp);
361 silc_list_start(tp->threads);
366 /* Free thread pool */
368 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
370 SilcThreadPoolThread t;
372 SILC_LOG_DEBUG(("Free thread pool %p", tp));
374 silc_mutex_lock(tp->lock);
378 silc_list_start(tp->threads);
379 while ((t = silc_list_get(tp->threads))) {
380 silc_mutex_lock(t->lock);
382 silc_cond_signal(t->thread_signal);
383 silc_mutex_unlock(t->lock);
386 if (wait_unfinished) {
387 SILC_LOG_DEBUG(("Wait threads to finish"));
388 while (silc_list_count(tp->threads))
389 silc_cond_wait(tp->pool_signal, tp->lock);
392 /* Release reference. Releases lock also. */
393 silc_thread_pool_unref(tp);
396 /* Execute code in a thread in the pool */
398 SilcBool silc_thread_pool_run(SilcThreadPool tp,
400 SilcSchedule schedule,
401 SilcThreadPoolFunc run,
403 SilcTaskCallback completion,
404 void *completion_context)
406 SilcThreadPoolThread t, q;
408 silc_mutex_lock(tp->lock);
411 silc_mutex_unlock(tp->lock);
415 /* Get free thread */
416 silc_list_start(tp->free_threads);
417 t = silc_list_get(tp->free_threads);
419 if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
420 /* Maximum threads reached */
422 silc_mutex_unlock(tp->lock);
426 /* User wants to queue this call until thread becomes free. Get
427 a thread to assign this call. */
428 t = silc_list_get(tp->threads);
430 /* List wraps around */
431 silc_list_start(tp->threads);
432 t = silc_list_get(tp->threads);
434 silc_mutex_unlock(tp->lock);
436 SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
437 run, run_context, t));
439 silc_mutex_lock(t->lock);
441 /* Get free call context from the list */
442 silc_list_start(t->free_queue);
443 q = silc_list_get(t->free_queue);
445 q = silc_scalloc(tp->stack, 1, sizeof(*q));
447 silc_mutex_unlock(t->lock);
451 silc_list_del(t->free_queue, q);
455 q->run_context = run_context;
456 q->completion = completion;
457 q->completion_context = completion_context;
458 q->schedule = schedule;
460 /* Add at the start of the list. It gets executed first. */
461 silc_list_insert(t->queue, NULL, q);
462 silc_mutex_unlock(t->lock);
465 /* Create new thread */
466 t = silc_thread_pool_new_thread(tp);
468 silc_mutex_unlock(tp->lock);
474 silc_list_del(tp->free_threads, t);
475 silc_mutex_unlock(tp->lock);
477 SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
479 silc_mutex_lock(t->lock);
481 /* Mark this call to be executed in this thread */
483 t->run_context = run_context;
484 t->completion = completion;
485 t->completion_context = completion_context;
486 t->schedule = schedule;
488 /* Signal the thread */
489 silc_cond_signal(t->thread_signal);
490 silc_mutex_unlock(t->lock);
495 /* Set maximum threads in the pool */
497 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
498 SilcUInt32 max_threads)
500 SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
502 silc_mutex_lock(tp->lock);
503 tp->max_threads = max_threads;
504 silc_mutex_unlock(tp->lock);
507 /* Get maximum threads in the pool */
509 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
511 SilcUInt32 max_threads;
513 silc_mutex_lock(tp->lock);
514 max_threads = tp->max_threads;
515 silc_mutex_unlock(tp->lock);
520 /* Get numnber of free threads in the pool */
522 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
524 SilcUInt32 free_threads;
526 silc_mutex_lock(tp->lock);
527 free_threads = silc_list_count(tp->free_threads);
528 silc_mutex_unlock(tp->lock);
535 void silc_thread_pool_purge(SilcThreadPool tp)
537 SilcThreadPoolThread t;
540 silc_mutex_lock(tp->lock);
542 if (silc_list_count(tp->free_threads) <= tp->min_threads) {
543 SILC_LOG_DEBUG(("No threads to purge"));
544 silc_mutex_unlock(tp->lock);
548 i = silc_list_count(tp->free_threads) - tp->min_threads;
550 SILC_LOG_DEBUG(("Purge %d threads", i));
552 silc_list_start(tp->threads);
553 while ((t = silc_list_get(tp->threads))) {
554 silc_mutex_lock(t->lock);
556 silc_mutex_unlock(t->lock);
560 /* Signal the thread to stop */
562 silc_cond_signal(t->thread_signal);
563 silc_mutex_unlock(t->lock);
565 silc_list_del(tp->free_threads, t);
572 silc_list_start(tp->threads);
573 silc_mutex_unlock(tp->lock);