5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2007 - 2008 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
20 #include "silcruntime.h"
22 /***************************** Thread Pool API *****************************/
24 /* Explanation of the thread pool execution.
26 When new call is added to thread pool by calling silc_thread_pool_run
27 it is assigned to a first free thread from the free list. If no threads
28 are available we take one from the threads list and assign the call to
29 its queue. The threads list always takes different thread finally wrapping
30 from the beginning. This way each thread will get a chance to execute
33 The thread function silc_thread_pool_run_thread executes each call. After
34 executing the current call that has been assigned to it, it will check
35 if there are any queued calls in its queue, and it will execute all calls
36 from the queue. If there aren't any calls in the queue, it will attempt
37 to steal a call from some other thread and execute it.
39 The queue list is always consumed in last-in-first-out order. The most
40 recently added call gets priority. With full utilization this helps to
41 avoid CPU cache misses. Since the queues are thread specific with full
42 utilization each thread should always be doing work for the most recent
43 (and thus most important) calls. */
45 /************************** Types and definitions ***************************/
47 /* Thread pool thread context. Each thread contains the most current call
48 to be executed, and a list of queued calls. */
49 typedef struct SilcThreadPoolThreadStruct {
50 struct SilcThreadPoolThreadStruct *next;
51 struct SilcThreadPoolThreadStruct *next2;
52 SilcThreadPool tp; /* The thread pool */
53 SilcCond thread_signal; /* Condition variable for signalling */
54 SilcMutex lock; /* Thread lock */
55 SilcList queue; /* Queue for waiting calls */
56 SilcList free_queue; /* Queue freelist */
57 SilcSchedule schedule; /* The current Scheduler, may be NULL */
58 SilcThreadPoolFunc run; /* The current call to run in a thread */
59 SilcTaskCallback completion; /* The current Completion function */
61 void *completion_context;
62 unsigned int stop : 1; /* Set to stop the thread */
63 } *SilcThreadPoolThread;
65 /* Thread pool context */
66 struct SilcThreadPoolStruct {
67 SilcStack stack; /* Stack for memory allocation */
68 SilcCond pool_signal; /* Condition variable for signalling */
69 SilcMutex lock; /* Pool lock */
70 SilcList threads; /* Threads in the pool */
71 SilcList free_threads; /* Threads freelist */
72 SilcUInt16 min_threads; /* Minimum threads in the pool */
73 SilcUInt16 max_threads; /* Maximum threads in the pool */
74 SilcUInt16 refcnt; /* Reference counter */
75 unsigned int destroy : 1; /* Set when pool is to be destroyed */
78 /************************ Static utility functions **************************/
80 /* Reference thread pool. Must be called locked. */
82 static void silc_thread_pool_ref(SilcThreadPool tp)
85 SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
89 /* Unreference thread pool. Must be called locked. Releases the lock. */
91 static void silc_thread_pool_unref(SilcThreadPool tp)
94 SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
97 SilcStack stack = tp->stack;
98 silc_mutex_unlock(tp->lock);
99 silc_mutex_free(tp->lock);
100 silc_cond_free(tp->pool_signal);
101 silc_sfree(stack, tp);
102 silc_stack_free(stack);
105 silc_mutex_unlock(tp->lock);
108 /* The thread executor. Each thread in the pool is run here. They wait
109 here for something to do which is given to them by silc_thread_pool_run. */
111 static void *silc_thread_pool_run_thread(void *context)
113 SilcThreadPoolThread t = context, o, q;
114 SilcThreadPool tp = t->tp;
115 SilcMutex lock = t->lock;
116 SilcCond thread_signal = t->thread_signal;
118 silc_mutex_lock(lock);
121 /* Wait here for code to execute */
122 while (!t->run && !t->stop)
123 silc_cond_wait(thread_signal, lock);
129 silc_mutex_unlock(lock);
131 SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
133 t->run(t->schedule, t->run_context);
135 /* If scheduler is NULL, call completion directly from here. Otherwise
136 it is called through the scheduler in the thread where the scheduler
140 SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
141 if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
142 t->completion_context, 0, 0)) {
143 SILC_LOG_DEBUG(("Run completion directly"));
144 t->completion(NULL, NULL, 0, 0, t->completion_context);
146 silc_schedule_wakeup(t->schedule);
148 SILC_LOG_DEBUG(("Run completion directly"));
149 t->completion(NULL, NULL, 0, 0, t->completion_context);
153 silc_mutex_lock(lock);
157 /* Check if there are calls in queue. Takes the most recently added
158 call since new ones are added at the start of the list. */
159 if (silc_list_count(t->queue) > 0) {
161 silc_list_start(t->queue);
162 q = silc_list_get(t->queue);
164 SILC_LOG_DEBUG(("Execute call from queue"));
166 /* Execute this call now */
168 t->run_context = q->run_context;
169 t->completion = q->completion;
170 t->completion_context = q->completion_context;
171 t->schedule = q->schedule;
173 silc_list_del(t->queue, q);
174 silc_list_add(t->free_queue, q);
175 silc_mutex_unlock(lock);
179 silc_mutex_unlock(lock);
180 silc_mutex_lock(tp->lock);
182 /* Nothing to do. Attempt to steal call from some other thread. */
183 o = silc_list_get(tp->threads);
185 /* List wraps around */
186 silc_list_start(tp->threads);
187 o = silc_list_get(tp->threads);
190 /* Check that the other thread is valid and has something to execute. */
191 silc_mutex_lock(o->lock);
192 if (o == t || o->stop || silc_list_count(o->queue) == 0) {
193 silc_mutex_unlock(o->lock);
198 silc_mutex_unlock(tp->lock);
199 silc_list_start(o->queue);
200 q = silc_list_get(o->queue);
202 SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
204 /* Execute this call now */
206 t->run_context = q->run_context;
207 t->completion = q->completion;
208 t->completion_context = q->completion_context;
209 t->schedule = q->schedule;
211 silc_list_del(o->queue, q);
212 silc_list_add(o->free_queue, q);
213 silc_mutex_unlock(o->lock);
217 silc_mutex_lock(lock);
219 silc_mutex_unlock(tp->lock);
223 /* Now that we have the lock back, check the queue again. */
224 if (silc_list_count(t->queue) > 0) {
225 silc_mutex_unlock(tp->lock);
229 /* The thread is now free for use again. */
231 t->completion = NULL;
233 silc_list_add(tp->free_threads, t);
234 silc_mutex_unlock(tp->lock);
238 /* Stop the thread. Remove from threads list. */
239 SILC_LOG_DEBUG(("Stop thread %p", t));
241 /* We can unlock the thread now. After we get the thread pool lock
242 no one can retrieve the thread anymore. */
243 silc_mutex_unlock(lock);
244 silc_mutex_lock(tp->lock);
246 silc_list_del(tp->threads, t);
247 silc_list_start(tp->threads);
249 /* Clear thread's call queue. */
250 silc_list_start(t->queue);
251 silc_list_start(t->free_queue);
252 while ((q = silc_list_get(t->queue)))
253 silc_sfree(tp->stack, q);
254 while ((q = silc_list_get(t->free_queue)))
255 silc_sfree(tp->stack, q);
257 /* Destroy the thread */
258 silc_mutex_free(lock);
259 silc_cond_free(thread_signal);
260 silc_sfree(tp->stack, t);
262 /* If we are last thread, signal the waiting destructor. */
263 if (silc_list_count(tp->threads) == 0)
264 silc_cond_signal(tp->pool_signal);
266 /* Release pool reference. Releases lock also. */
267 silc_thread_pool_unref(tp);
272 /* Creates new thread to thread pool */
274 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
276 SilcThreadPoolThread t;
278 t = silc_scalloc(tp->stack, 1, sizeof(*t));
282 if (!silc_mutex_alloc(&t->lock)) {
283 silc_sfree(tp->stack, t);
287 if (!silc_cond_alloc(&t->thread_signal)) {
288 silc_mutex_free(t->lock);
289 silc_sfree(tp->stack, t);
294 silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
295 silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
297 /* Add to thread pool */
298 silc_list_add(tp->threads, t);
299 silc_list_add(tp->free_threads, t);
300 silc_thread_pool_ref(tp);
302 SILC_LOG_DEBUG(("Start thread %p", t));
304 /* Start the thread */
305 silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
310 /**************************** Thread Pool API *******************************/
312 /* Allocate thread pool */
314 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
315 SilcUInt32 min_threads,
316 SilcUInt32 max_threads,
317 SilcBool start_min_threads)
322 if (max_threads < min_threads) {
323 silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT,
324 "Max threads is smaller than min threads (%d < %d)",
325 max_threads, min_threads);
329 silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0");
334 stack = silc_stack_alloc(0, stack);
336 tp = silc_scalloc(stack, 1, sizeof(*tp));
338 silc_stack_free(stack);
342 SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
343 tp, min_threads, max_threads));
346 tp->min_threads = min_threads;
347 tp->max_threads = max_threads;
350 if (!silc_mutex_alloc(&tp->lock)) {
351 silc_sfree(stack, tp);
352 silc_stack_free(stack);
356 if (!silc_cond_alloc(&tp->pool_signal)) {
357 silc_mutex_free(tp->lock);
358 silc_sfree(stack, tp);
359 silc_stack_free(stack);
363 silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
364 silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
366 for (i = 0; i < tp->min_threads && start_min_threads; i++)
367 silc_thread_pool_new_thread(tp);
369 silc_list_start(tp->threads);
374 /* Free thread pool */
376 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
378 SilcThreadPoolThread t;
380 SILC_LOG_DEBUG(("Free thread pool %p", tp));
382 silc_mutex_lock(tp->lock);
386 silc_list_start(tp->threads);
387 while ((t = silc_list_get(tp->threads))) {
388 silc_mutex_lock(t->lock);
390 silc_cond_signal(t->thread_signal);
391 silc_mutex_unlock(t->lock);
394 if (wait_unfinished) {
395 SILC_LOG_DEBUG(("Wait threads to finish"));
396 while (silc_list_count(tp->threads))
397 silc_cond_wait(tp->pool_signal, tp->lock);
400 /* Release reference. Releases lock also. */
401 silc_thread_pool_unref(tp);
404 /* Execute code in a thread in the pool */
406 SilcBool silc_thread_pool_run(SilcThreadPool tp,
408 SilcSchedule schedule,
409 SilcThreadPoolFunc run,
411 SilcTaskCallback completion,
412 void *completion_context)
414 SilcThreadPoolThread t, q;
416 silc_mutex_lock(tp->lock);
419 silc_mutex_unlock(tp->lock);
420 silc_set_errno(SILC_ERR_NOT_VALID);
424 /* Get free thread */
425 silc_list_start(tp->free_threads);
426 t = silc_list_get(tp->free_threads);
428 if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
429 /* Maximum threads reached */
431 silc_mutex_unlock(tp->lock);
432 silc_set_errno(SILC_ERR_LIMIT);
436 /* User wants to queue this call until thread becomes free. Get
437 a thread to assign this call. */
438 t = silc_list_get(tp->threads);
440 /* List wraps around */
441 silc_list_start(tp->threads);
442 t = silc_list_get(tp->threads);
444 silc_mutex_unlock(tp->lock);
446 SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
447 run, run_context, t));
449 silc_mutex_lock(t->lock);
451 /* Get free call context from the list */
452 silc_list_start(t->free_queue);
453 q = silc_list_get(t->free_queue);
455 q = silc_scalloc(tp->stack, 1, sizeof(*q));
457 silc_mutex_unlock(t->lock);
461 silc_list_del(t->free_queue, q);
465 q->run_context = run_context;
466 q->completion = completion;
467 q->completion_context = completion_context;
468 q->schedule = schedule;
470 /* Add at the start of the list. It gets executed first. */
471 silc_list_insert(t->queue, NULL, q);
472 silc_mutex_unlock(t->lock);
475 /* Create new thread */
476 t = silc_thread_pool_new_thread(tp);
478 silc_mutex_unlock(tp->lock);
484 silc_list_del(tp->free_threads, t);
485 silc_mutex_unlock(tp->lock);
487 SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
489 silc_mutex_lock(t->lock);
491 /* Mark this call to be executed in this thread */
493 t->run_context = run_context;
494 t->completion = completion;
495 t->completion_context = completion_context;
496 t->schedule = schedule;
498 /* Signal the thread */
499 silc_cond_signal(t->thread_signal);
500 silc_mutex_unlock(t->lock);
505 /* Set maximum threads in the pool */
507 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
508 SilcUInt32 max_threads)
510 SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
512 silc_mutex_lock(tp->lock);
513 tp->max_threads = max_threads;
514 silc_mutex_unlock(tp->lock);
517 /* Get maximum threads in the pool */
519 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
521 SilcUInt32 max_threads;
523 silc_mutex_lock(tp->lock);
524 max_threads = tp->max_threads;
525 silc_mutex_unlock(tp->lock);
530 /* Get numnber of free threads in the pool */
532 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
534 SilcUInt32 free_threads;
536 silc_mutex_lock(tp->lock);
537 free_threads = silc_list_count(tp->free_threads);
538 silc_mutex_unlock(tp->lock);
545 void silc_thread_pool_purge(SilcThreadPool tp)
547 SilcThreadPoolThread t;
550 silc_mutex_lock(tp->lock);
552 if (silc_list_count(tp->free_threads) <= tp->min_threads) {
553 SILC_LOG_DEBUG(("No threads to purge"));
554 silc_mutex_unlock(tp->lock);
558 i = silc_list_count(tp->free_threads) - tp->min_threads;
560 SILC_LOG_DEBUG(("Purge %d threads", i));
562 silc_list_start(tp->threads);
563 while ((t = silc_list_get(tp->threads))) {
564 silc_mutex_lock(t->lock);
566 silc_mutex_unlock(t->lock);
570 /* Signal the thread to stop */
572 silc_cond_signal(t->thread_signal);
573 silc_mutex_unlock(t->lock);
575 silc_list_del(tp->free_threads, t);
582 silc_list_start(tp->threads);
583 silc_mutex_unlock(tp->lock);
586 /*************************** Thread-local Storage ***************************/
588 /* Set Tls context */
590 void silc_thread_tls_set(void *context)
592 SilcTls tls = silc_thread_get_tls();
595 /* Initialize Tls for this thread */
596 tls = silc_thread_tls_init();
601 tls->thread_context = context;
604 /* Get Tls context */
606 void *silc_thread_tls_get(void)
608 SilcTls tls = silc_thread_get_tls();
611 return tls->thread_context;