5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2007 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
22 /* Explanation of the thread pool execution.
24 When new call is added to thread pool by calling silc_thread_pool_run
25 it is assigned to a first free thread from the free list. If no threads
26 are available we take one from the threads list and assign the call to
27 its queue. The threads list always takes different thread finally wrapping
28 from the beginning. This way each thread will get a chance to execute
31 The thread function silc_thread_pool_run_thread executes each call. After
32 executing the current call that has been assigned to it, it will check
33 if there are any queued calls in its queue, and it will execute all calls
34 from the queue. If there aren't any calls in the queue, it will attempt
35 to steal a call from some other thread and execute it.
37 The queue list is always consumed in last-in-first-out order. The most
38 recently added call gets priority. With full utilization this helps to
39 avoid CPU cache misses. Since the queues are thread specific with full
40 utilization each thread should always be doing work for the most recent
41 (and thus most important) calls. */
43 /************************** Types and definitions ***************************/
45 /* Thread pool thread context. Each thread contains the most current call
46 to be executed, and a list of queued calls. */
47 typedef struct SilcThreadPoolThreadStruct {
48 struct SilcThreadPoolThreadStruct *next;
49 struct SilcThreadPoolThreadStruct *next2;
50 SilcThreadPool tp; /* The thread pool */
51 SilcCond thread_signal; /* Condition variable for signalling */
52 SilcMutex lock; /* Thread lock */
53 SilcList queue; /* Queue for waiting calls */
54 SilcList free_queue; /* Queue freelist */
55 SilcSchedule schedule; /* The current Scheduler, may be NULL */
56 SilcThreadPoolFunc run; /* The current call to run in a thread */
57 SilcTaskCallback completion; /* The current Completion function */
59 void *completion_context;
60 unsigned int stop : 1; /* Set to stop the thread */
61 } *SilcThreadPoolThread;
63 /* Thread pool context */
64 struct SilcThreadPoolStruct {
65 SilcStack stack; /* Stack for memory allocation */
66 SilcCond pool_signal; /* Condition variable for signalling */
67 SilcMutex lock; /* Pool lock */
68 SilcList threads; /* Threads in the pool */
69 SilcList free_threads; /* Threads freelist */
70 SilcUInt16 min_threads; /* Minimum threads in the pool */
71 SilcUInt16 max_threads; /* Maximum threads in the pool */
72 SilcUInt16 refcnt; /* Reference counter */
73 unsigned int destroy : 1; /* Set when pool is to be destroyed */
76 /************************ Static utility functions **************************/
78 /* Reference thread pool. Must be called locked. */
80 static void silc_thread_pool_ref(SilcThreadPool tp)
83 SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
87 /* Unreference thread pool. Must be called locked. Releases the lock. */
89 static void silc_thread_pool_unref(SilcThreadPool tp)
92 SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
95 SilcStack stack = tp->stack;
96 silc_mutex_unlock(tp->lock);
97 silc_mutex_free(tp->lock);
98 silc_cond_free(tp->pool_signal);
99 silc_sfree(stack, tp);
100 silc_stack_free(stack);
103 silc_mutex_unlock(tp->lock);
106 /* The thread executor. Each thread in the pool is run here. They wait
107 here for something to do which is given to them by silc_thread_pool_run. */
109 static void *silc_thread_pool_run_thread(void *context)
111 SilcThreadPoolThread t = context, o, q;
112 SilcThreadPool tp = t->tp;
113 SilcMutex lock = t->lock;
114 SilcCond thread_signal = t->thread_signal;
116 silc_mutex_lock(lock);
119 /* Wait here for code to execute */
120 while (!t->run && !t->stop)
121 silc_cond_wait(thread_signal, lock);
123 if (silc_unlikely(t->stop)) {
124 /* Stop the thread. Remove from threads list. */
125 SILC_LOG_DEBUG(("Stop thread %p", t));
126 silc_mutex_lock(tp->lock);
127 silc_list_del(tp->threads, t);
128 silc_list_start(tp->threads);
130 /* Clear thread's call queue. */
131 silc_list_start(t->queue);
132 silc_list_start(t->free_queue);
133 while ((q = silc_list_get(t->queue)))
134 silc_sfree(tp->stack, q);
135 while ((q = silc_list_get(t->free_queue)))
136 silc_sfree(tp->stack, q);
138 /* Destroy the thread */
139 silc_mutex_unlock(lock);
140 silc_mutex_free(lock);
141 silc_cond_free(thread_signal);
142 silc_sfree(tp->stack, t);
144 /* If we are last thread, signal the waiting destructor. */
145 if (silc_list_count(tp->threads) == 0)
146 silc_cond_signal(tp->pool_signal);
148 /* Release pool reference. Releases lock also. */
149 silc_thread_pool_unref(tp);
152 silc_mutex_unlock(lock);
156 SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
158 t->run(t->schedule, t->run_context);
160 /* If scheduler is NULL, call completion directly from here. Otherwise
161 it is called through the scheduler in the thread where the scheduler
165 SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
166 if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
167 t->completion_context, 0, 0)) {
168 SILC_LOG_DEBUG(("Run completion directly"));
169 t->completion(NULL, NULL, 0, 0, t->completion_context);
171 silc_schedule_wakeup(t->schedule);
173 SILC_LOG_DEBUG(("Run completion directly"));
174 t->completion(NULL, NULL, 0, 0, t->completion_context);
178 /* Check if there are calls in queue. Takes the most recently added
179 call since new ones are added at the start of the list. */
180 silc_mutex_lock(lock);
181 if (silc_list_count(t->queue) > 0) {
183 silc_list_start(t->queue);
184 q = silc_list_get(t->queue);
186 SILC_LOG_DEBUG(("Execute call from queue"));
188 /* Execute this call now */
190 t->run_context = q->run_context;
191 t->completion = q->completion;
192 t->completion_context = q->completion_context;
193 t->schedule = q->schedule;
195 silc_list_del(t->queue, q);
196 silc_list_add(t->free_queue, q);
197 silc_mutex_unlock(lock);
200 silc_mutex_unlock(lock);
202 /* Nothing to do. Attempt to steal call from some other thread. */
203 silc_mutex_lock(tp->lock);
204 o = silc_list_get(tp->threads);
206 /* List wraps around */
207 silc_list_start(tp->threads);
208 o = silc_list_get(tp->threads);
210 silc_mutex_unlock(tp->lock);
213 silc_mutex_lock(o->lock);
214 if (silc_list_count(o->queue) > 0) {
215 silc_list_start(o->queue);
216 q = silc_list_get(o->queue);
218 SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
220 /* Execute this call now */
222 t->run_context = q->run_context;
223 t->completion = q->completion;
224 t->completion_context = q->completion_context;
225 t->schedule = q->schedule;
227 silc_list_del(o->queue, q);
228 silc_list_add(o->free_queue, q);
229 silc_mutex_unlock(o->lock);
232 silc_mutex_unlock(o->lock);
236 silc_mutex_lock(lock);
238 /* Now that we have the lock back, check the queue again. */
239 if (silc_list_count(t->queue) > 0)
242 /* The thread is now free for use again. */
244 t->completion = NULL;
246 silc_list_add(tp->free_threads, t);
252 /* Creates new thread to thread pool */
254 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
256 SilcThreadPoolThread t;
258 t = silc_scalloc(tp->stack, 1, sizeof(*t));
262 if (!silc_mutex_alloc(&t->lock)) {
263 silc_sfree(tp->stack, t);
267 if (!silc_cond_alloc(&t->thread_signal)) {
268 silc_mutex_free(t->lock);
269 silc_sfree(tp->stack, t);
274 silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
275 silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
277 /* Add to thread pool */
278 silc_list_add(tp->threads, t);
279 silc_list_add(tp->free_threads, t);
280 silc_thread_pool_ref(tp);
282 SILC_LOG_DEBUG(("Start thread %p", t));
284 /* Start the thread */
285 silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
290 /**************************** Thread Pool API *******************************/
292 /* Allocate thread pool */
294 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
295 SilcUInt32 min_threads,
296 SilcUInt32 max_threads,
297 SilcBool start_min_threads)
302 if (max_threads < min_threads)
308 stack = silc_stack_alloc(0, stack);
310 tp = silc_scalloc(stack, 1, sizeof(*tp));
312 silc_stack_free(stack);
316 SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
317 tp, min_threads, max_threads));
320 tp->min_threads = min_threads;
321 tp->max_threads = max_threads;
324 if (!silc_mutex_alloc(&tp->lock)) {
325 silc_sfree(stack, tp);
326 silc_stack_free(stack);
330 if (!silc_cond_alloc(&tp->pool_signal)) {
331 silc_mutex_free(tp->lock);
332 silc_sfree(stack, tp);
333 silc_stack_free(stack);
337 silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
338 silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
340 for (i = 0; i < tp->min_threads && start_min_threads; i++)
341 silc_thread_pool_new_thread(tp);
343 silc_list_start(tp->threads);
348 /* Free thread pool */
350 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
352 SilcThreadPoolThread t;
354 SILC_LOG_DEBUG(("Free thread pool %p", tp));
356 silc_mutex_lock(tp->lock);
360 silc_list_start(tp->threads);
361 while ((t = silc_list_get(tp->threads))) {
362 silc_mutex_lock(t->lock);
364 silc_cond_signal(t->thread_signal);
365 silc_mutex_unlock(t->lock);
368 if (wait_unfinished) {
369 SILC_LOG_DEBUG(("Wait threads to finish"));
370 while (silc_list_count(tp->threads))
371 silc_cond_wait(tp->pool_signal, tp->lock);
374 /* Release reference. Releases lock also. */
375 silc_thread_pool_unref(tp);
378 /* Execute code in a thread in the pool */
380 SilcBool silc_thread_pool_run(SilcThreadPool tp,
382 SilcSchedule schedule,
383 SilcThreadPoolFunc run,
385 SilcTaskCallback completion,
386 void *completion_context)
388 SilcThreadPoolThread t, q;
390 silc_mutex_lock(tp->lock);
393 silc_mutex_unlock(tp->lock);
397 /* Get free thread */
398 silc_list_start(tp->free_threads);
399 t = silc_list_get(tp->free_threads);
401 if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
402 /* Maximum threads reached */
404 silc_mutex_unlock(tp->lock);
408 /* User wants to queue this call until thread becomes free. Get
409 a thread to assign this call. */
410 t = silc_list_get(tp->threads);
412 /* List wraps around */
413 silc_list_start(tp->threads);
414 t = silc_list_get(tp->threads);
416 silc_mutex_unlock(tp->lock);
418 SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
419 run, run_context, t));
421 silc_mutex_lock(t->lock);
423 /* Get free call context from the list */
424 silc_list_start(t->free_queue);
425 q = silc_list_get(t->free_queue);
427 q = silc_scalloc(tp->stack, 1, sizeof(*q));
429 silc_mutex_unlock(t->lock);
433 silc_list_del(t->free_queue, q);
437 q->run_context = run_context;
438 q->completion = completion;
439 q->completion_context = completion_context;
440 q->schedule = schedule;
442 /* Add at the start of the list. It gets executed first. */
443 silc_list_insert(t->queue, NULL, q);
444 silc_mutex_unlock(t->lock);
447 /* Create new thread */
448 t = silc_thread_pool_new_thread(tp);
450 silc_mutex_unlock(tp->lock);
456 silc_list_del(tp->free_threads, t);
457 silc_mutex_unlock(tp->lock);
459 SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
461 silc_mutex_lock(t->lock);
463 /* Mark this call to be executed in this thread */
465 t->run_context = run_context;
466 t->completion = completion;
467 t->completion_context = completion_context;
468 t->schedule = schedule;
470 /* Signal the thread */
471 silc_cond_signal(t->thread_signal);
472 silc_mutex_unlock(t->lock);
477 /* Set maximum threads in the pool */
479 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
480 SilcUInt32 max_threads)
482 SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
484 silc_mutex_lock(tp->lock);
485 tp->max_threads = max_threads;
486 silc_mutex_unlock(tp->lock);
489 /* Get maximum threads in the pool */
491 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
493 SilcUInt32 max_threads;
495 silc_mutex_lock(tp->lock);
496 max_threads = tp->max_threads;
497 silc_mutex_unlock(tp->lock);
502 /* Get numnber of free threads in the pool */
504 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
506 SilcUInt32 free_threads;
508 silc_mutex_lock(tp->lock);
509 free_threads = silc_list_count(tp->free_threads);
510 silc_mutex_unlock(tp->lock);
517 void silc_thread_pool_purge(SilcThreadPool tp)
519 SilcThreadPoolThread t;
522 silc_mutex_lock(tp->lock);
524 if (silc_list_count(tp->free_threads) <= tp->min_threads) {
525 SILC_LOG_DEBUG(("No threads to purge"));
526 silc_mutex_unlock(tp->lock);
530 i = silc_list_count(tp->free_threads) - tp->min_threads;
532 SILC_LOG_DEBUG(("Purge %d threads", i));
534 silc_list_start(tp->threads);
535 while ((t = silc_list_get(tp->threads))) {
536 silc_mutex_lock(t->lock);
538 silc_mutex_unlock(t->lock);
542 /* Signal the thread to stop */
544 silc_cond_signal(t->thread_signal);
545 silc_mutex_unlock(t->lock);
547 silc_list_del(tp->free_threads, t);
554 silc_list_start(tp->threads);
555 silc_mutex_unlock(tp->lock);