5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2007 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
22 /* Explanation of the thread pool execution.
24 When new call is added to thread pool by calling silc_thread_pool_run
25 it is assigned to a first free thread from the free list. If no threads
26 are available we take one from the threads list and assign the call to
27 its queue. The threads list always takes different thread finally wrapping
28 from the beginning. This way each thread will get a chance to execute
31 The thread function silc_thread_pool_run_thread executes each call. After
32 executing the current call that has been assigned to it, it will check
33 if there are any queued calls in its queue, and it will execute all calls
34 from the queue. If there aren't any calls in the queue, it will attempt
35 to steal a call from some other thread and execute it.
37 The queue list is always consumed in last-in-first-out order. The most
38 recently added call gets priority. With full utilization this helps to
39 avoid CPU cache misses. Since the queues are thread specific with full
40 utilization each thread should always be doing work for the most recent
41 (and thus most important) calls. */
43 /************************** Types and definitions ***************************/
45 /* Thread pool thread context. Each thread contains the most current call
46 to be executed, and a list of queued calls. */
47 typedef struct SilcThreadPoolThreadStruct {
48 struct SilcThreadPoolThreadStruct *next;
49 struct SilcThreadPoolThreadStruct *next2;
50 SilcThreadPool tp; /* The thread pool */
51 SilcMutex lock; /* Thread lock */
52 SilcList queue; /* Queue for waiting calls */
53 SilcList free_queue; /* Queue freelist */
54 SilcSchedule schedule; /* The current Scheduler, may be NULL */
55 SilcThreadPoolFunc run; /* The current call to run in a thread */
56 SilcTaskCallback completion; /* The current Completion function */
58 void *completion_context;
59 unsigned int stop : 1; /* Set to stop the thread */
60 } *SilcThreadPoolThread;
62 /* Thread pool context */
63 struct SilcThreadPoolStruct {
64 SilcStack stack; /* Stack for memory allocation */
65 SilcMutex lock; /* Pool lock */
66 SilcCond pool_signal; /* Condition variable for signalling */
67 SilcList threads; /* Threads in the pool */
68 SilcList free_threads; /* Threads freelist */
69 SilcUInt16 min_threads; /* Minimum threads in the pool */
70 SilcUInt16 max_threads; /* Maximum threads in the pool */
71 SilcUInt16 refcnt; /* Reference counter */
72 unsigned int destroy : 1; /* Set when pool is to be destroyed */
75 /************************ Static utility functions **************************/
77 /* Reference thread pool. Must be called locked. */
79 static void silc_thread_pool_ref(SilcThreadPool tp)
82 SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1,
86 /* Unreference thread pool. Must be called locked. Releases the lock. */
88 static void silc_thread_pool_unref(SilcThreadPool tp)
91 SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1,
94 SilcStack stack = tp->stack;
95 silc_mutex_unlock(tp->lock);
96 silc_mutex_free(tp->lock);
97 silc_cond_free(tp->pool_signal);
98 silc_sfree(stack, tp);
99 silc_stack_free(stack);
102 silc_mutex_unlock(tp->lock);
105 /* The thread executor. Each thread in the pool is run here. They wait
106 here for something to do which is given to them by silc_thread_pool_run. */
108 static void *silc_thread_pool_run_thread(void *context)
110 SilcThreadPoolThread t = context, o, q;
111 SilcThreadPool tp = t->tp;
112 SilcMutex lock = tp->lock;
113 SilcCond pool_signal = tp->pool_signal;
115 silc_mutex_lock(lock);
118 /* Wait here for code to execute */
119 while (!t->run && !t->stop)
120 silc_cond_wait(pool_signal, lock);
122 if (silc_unlikely(t->stop)) {
123 /* Stop the thread. Remove from threads list and free memory. */
124 SILC_LOG_DEBUG(("Stop thread %p", t));
125 silc_list_del(tp->threads, t);
126 silc_list_start(tp->threads);
128 /* Clear thread's call queue. */
129 silc_list_start(t->queue);
130 silc_list_start(t->free_queue);
131 while ((q = silc_list_get(t->queue)))
132 silc_sfree(tp->stack, q);
133 while ((q = silc_list_get(t->free_queue)))
134 silc_sfree(tp->stack, q);
136 silc_mutex_free(t->lock);
137 silc_sfree(tp->stack, t);
139 /* If we are last thread, signal the waiting destructor. */
140 if (silc_list_count(tp->threads) == 0)
141 silc_cond_signal(pool_signal);
143 /* Release pool reference. Releases lock also. */
144 silc_thread_pool_unref(tp);
147 silc_mutex_unlock(lock);
151 SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
153 t->run(t->schedule, t->run_context);
155 /* If scheduler is NULL, call completion directly from here. Otherwise
156 it is called through the scheduler in the thread where the scheduler
160 SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule));
161 if (!silc_schedule_task_add_timeout(t->schedule, t->completion,
162 t->completion_context, 0, 0)) {
163 SILC_LOG_DEBUG(("Run completion directly"));
164 t->completion(NULL, NULL, 0, 0, t->completion_context);
166 silc_schedule_wakeup(t->schedule);
168 SILC_LOG_DEBUG(("Run completion directly"));
169 t->completion(NULL, NULL, 0, 0, t->completion_context);
173 /* Check if there are calls in queue. Takes the most recently added
174 call since new ones are added at the start of the list. */
175 silc_mutex_lock(t->lock);
176 if (silc_list_count(t->queue) > 0) {
177 silc_list_start(t->queue);
178 q = silc_list_get(t->queue);
180 SILC_LOG_DEBUG(("Execute call from queue"));
182 /* Execute this call now */
184 t->run_context = q->run_context;
185 t->completion = q->completion;
186 t->completion_context = q->completion_context;
187 t->schedule = q->schedule;
189 silc_list_del(t->queue, q);
190 silc_list_add(t->free_queue, q);
191 silc_mutex_unlock(t->lock);
194 silc_mutex_unlock(t->lock);
196 /* Nothing to do. Attempt to steal call from some other thread. */
197 silc_mutex_lock(lock);
198 o = silc_list_get(tp->threads);
200 /* List wraps around */
201 silc_list_start(tp->threads);
202 o = silc_list_get(tp->threads);
204 silc_mutex_unlock(lock);
207 silc_mutex_lock(o->lock);
208 if (silc_list_count(o->queue) > 0) {
209 silc_list_start(o->queue);
210 q = silc_list_get(o->queue);
212 SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
214 /* Execute this call now */
216 t->run_context = q->run_context;
217 t->completion = q->completion;
218 t->completion_context = q->completion_context;
219 t->schedule = q->schedule;
221 silc_list_del(o->queue, q);
222 silc_list_add(o->free_queue, q);
223 silc_mutex_unlock(o->lock);
226 silc_mutex_unlock(o->lock);
230 /* The thread is now free for use again. */
232 t->completion = NULL;
235 silc_mutex_lock(lock);
236 silc_list_add(tp->free_threads, t);
242 /* Creates new thread to thread pool */
244 static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
246 SilcThreadPoolThread t;
248 t = silc_scalloc(tp->stack, 1, sizeof(*t));
252 if (!silc_mutex_alloc(&t->lock)) {
253 silc_sfree(tp->stack, t);
258 silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
259 silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
261 /* Add to thread pool */
262 silc_list_add(tp->threads, t);
263 silc_list_add(tp->free_threads, t);
264 silc_thread_pool_ref(tp);
266 SILC_LOG_DEBUG(("Start thread %p", t));
268 /* Start the thread */
269 silc_thread_create(silc_thread_pool_run_thread, t, FALSE);
274 /**************************** Thread Pool API *******************************/
276 /* Allocate thread pool */
278 SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
279 SilcUInt32 min_threads,
280 SilcUInt32 max_threads,
281 SilcBool start_min_threads)
286 if (max_threads < min_threads)
292 stack = silc_stack_alloc(0, stack);
294 tp = silc_scalloc(stack, 1, sizeof(*tp));
296 silc_stack_free(stack);
300 SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d",
301 tp, min_threads, max_threads));
304 tp->min_threads = min_threads;
305 tp->max_threads = max_threads;
308 if (!silc_mutex_alloc(&tp->lock)) {
309 silc_sfree(stack, tp);
310 silc_stack_free(stack);
314 if (!silc_cond_alloc(&tp->pool_signal)) {
315 silc_mutex_free(tp->lock);
316 silc_sfree(stack, tp);
317 silc_stack_free(stack);
321 silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
322 silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
324 for (i = 0; i < tp->min_threads && start_min_threads; i++)
325 silc_thread_pool_new_thread(tp);
327 silc_list_start(tp->threads);
332 /* Free thread pool */
334 void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
336 SilcThreadPoolThread t;
338 SILC_LOG_DEBUG(("Free thread pool %p", tp));
340 silc_mutex_lock(tp->lock);
344 silc_list_start(tp->threads);
345 while ((t = silc_list_get(tp->threads)))
347 silc_cond_broadcast(tp->pool_signal);
349 if (wait_unfinished) {
350 SILC_LOG_DEBUG(("Wait threads to finish"));
351 while (silc_list_count(tp->threads))
352 silc_cond_wait(tp->pool_signal, tp->lock);
355 /* Release reference. Releases lock also. */
356 silc_thread_pool_unref(tp);
359 /* Execute code in a thread in the pool */
361 SilcBool silc_thread_pool_run(SilcThreadPool tp,
363 SilcSchedule schedule,
364 SilcThreadPoolFunc run,
366 SilcTaskCallback completion,
367 void *completion_context)
369 SilcThreadPoolThread t, q;
371 silc_mutex_lock(tp->lock);
374 silc_mutex_unlock(tp->lock);
378 /* Get free thread */
379 silc_list_start(tp->free_threads);
380 t = silc_list_get(tp->free_threads);
382 if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
383 /* Maximum threads reached */
385 silc_mutex_unlock(tp->lock);
389 /* User wants to queue this call until thread becomes free. Get
390 a thread to assign this call. */
391 t = silc_list_get(tp->threads);
393 /* List wraps around */
394 silc_list_start(tp->threads);
395 t = silc_list_get(tp->threads);
398 SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
399 run, run_context, t));
401 /* Lock the thread. Keep also thread pool lock so that this thread
402 cannot become free while we're adding call to its queue. */
403 silc_mutex_lock(t->lock);
405 /* Get free call context from the list */
406 silc_list_start(t->free_queue);
407 q = silc_list_get(t->free_queue);
409 q = silc_scalloc(tp->stack, 1, sizeof(*q));
411 silc_mutex_unlock(t->lock);
412 silc_mutex_unlock(tp->lock);
416 silc_list_del(t->free_queue, q);
420 q->run_context = run_context;
421 q->completion = completion;
422 q->completion_context = completion_context;
423 q->schedule = schedule;
425 /* Add at the start of the list. It gets executed first. */
426 silc_list_insert(t->queue, NULL, q);
427 silc_mutex_unlock(t->lock);
428 silc_mutex_unlock(tp->lock);
431 /* Create new thread */
432 t = silc_thread_pool_new_thread(tp);
434 silc_mutex_unlock(tp->lock);
440 SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
442 /* Mark this call to be executed in this thread */
444 t->run_context = run_context;
445 t->completion = completion;
446 t->completion_context = completion_context;
447 t->schedule = schedule;
448 silc_list_del(tp->free_threads, t);
451 silc_cond_broadcast(tp->pool_signal);
453 silc_mutex_unlock(tp->lock);
457 /* Set maximum threads in the pool */
459 void silc_thread_pool_set_max_threads(SilcThreadPool tp,
460 SilcUInt32 max_threads)
462 SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads));
464 silc_mutex_lock(tp->lock);
465 tp->max_threads = max_threads;
466 silc_mutex_unlock(tp->lock);
469 /* Get maximum threads in the pool */
471 SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp)
473 SilcUInt32 max_threads;
475 silc_mutex_lock(tp->lock);
476 max_threads = tp->max_threads;
477 silc_mutex_unlock(tp->lock);
482 /* Get numnber of free threads in the pool */
484 SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp)
486 SilcUInt32 free_threads;
488 silc_mutex_lock(tp->lock);
489 free_threads = silc_list_count(tp->free_threads);
490 silc_mutex_unlock(tp->lock);
497 void silc_thread_pool_purge(SilcThreadPool tp)
499 SilcThreadPoolThread t;
502 silc_mutex_lock(tp->lock);
504 if (silc_list_count(tp->free_threads) <= tp->min_threads) {
505 SILC_LOG_DEBUG(("No threads to purge"));
506 silc_mutex_unlock(tp->lock);
510 i = silc_list_count(tp->free_threads) - tp->min_threads;
512 SILC_LOG_DEBUG(("Purge %d threads", i));
514 silc_list_start(tp->threads);
515 while ((t = silc_list_get(tp->threads))) {
516 silc_mutex_lock(t->lock);
518 silc_mutex_unlock(t->lock);
523 silc_mutex_unlock(t->lock);
525 silc_list_del(tp->free_threads, t);
532 /* Signal threads to stop */
533 silc_cond_broadcast(tp->pool_signal);
535 silc_list_start(tp->threads);
536 silc_mutex_unlock(tp->lock);