/* silcthread.c Author: Pekka Riikonen Copyright (C) 2007 Pekka Riikonen This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. */ #include "silc.h" /************************** Types and definitions ***************************/ /* Thread pool thread context */ typedef struct SilcThreadPoolThreadStruct { struct SilcThreadPoolThreadStruct *next; struct SilcThreadPoolThreadStruct *next2; SilcThreadPool tp; /* The thread pool */ SilcSchedule schedule; /* Scheduler, may be NULL */ SilcThreadPoolFunc run; /* The function to run in a thread */ SilcTaskCallback completion; /* Completion function */ void *run_context; void *completion_context; unsigned int stop : 1; /* Set to stop the thread */ } *SilcThreadPoolThread; /* Thread pool context */ struct SilcThreadPoolStruct { SilcStack stack; /* Stack for memory allocation */ SilcMutex lock; /* Pool lock */ SilcCond pool_signal; /* Condition variable for signalling */ SilcList threads; /* Threads in the pool */ SilcList free_threads; /* Threads freelist */ SilcList queue; /* Queue for waiting calls */ SilcList free_queue; /* Queue freelist */ SilcUInt16 min_threads; /* Minimum threads in the pool */ SilcUInt16 max_threads; /* Maximum threads in the pool */ SilcUInt16 refcnt; /* Reference counter */ unsigned int destroy : 1; /* Set when pool is to be destroyed */ }; /************************ Static utility functions **************************/ /* Reference thread pool. Must be called locked. */ static void silc_thread_pool_ref(SilcThreadPool tp) { tp->refcnt++; SILC_LOG_DEBUG(("Thread pool %p, refcnt %d -> %d", tp, tp->refcnt - 1, tp->refcnt)); } /* Unreference thread pool. Must be called locked. Releases the lock. */ static void silc_thread_pool_unref(SilcThreadPool tp) { tp->refcnt--; SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1, tp->refcnt)); if (!tp->refcnt) { SilcStack stack = tp->stack; silc_mutex_unlock(tp->lock); silc_mutex_free(tp->lock); silc_cond_free(tp->pool_signal); silc_sfree(stack, tp); silc_stack_free(stack); return; } silc_mutex_unlock(tp->lock); } /* The thread executor. Each thread in the pool is run here. They wait here for something to do which is given to them by silc_thread_pool_run. */ static void *silc_thread_pool_run_thread(void *context) { SilcThreadPoolThread t = context, q; SilcThreadPool tp = t->tp; SilcMutex lock = tp->lock; SilcCond pool_signal = tp->pool_signal; silc_mutex_lock(lock); while (1) { /* Wait here for code to execute */ while (!t->run && !t->stop) silc_cond_wait(pool_signal, lock); if (t->stop) { /* Stop the thread. Remove from threads list and free memory. */ SILC_LOG_DEBUG(("Stop thread %p", t)); silc_list_del(tp->threads, t); silc_sfree(tp->stack, t); /* If we are last thread, signal the waiting destructor. */ if (silc_list_count(tp->threads) == 0) silc_cond_broadcast(pool_signal); /* Release pool reference. Releases lock also. */ silc_thread_pool_unref(tp); break; } silc_mutex_unlock(lock); /* Execute code */ SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run, t->run_context, t)); t->run(t->schedule, t->run_context); /* If scheduler is NULL, call completion directly from here. Otherwise it is called through the scheduler in the thread where the scheduler is running. */ if (t->completion) { if (t->schedule) { SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule)); if (!silc_schedule_task_add_timeout(t->schedule, t->completion, t->completion_context, 0, 0)) { SILC_LOG_DEBUG(("Run completion directly")); t->completion(NULL, NULL, 0, 0, t->completion_context); } silc_schedule_wakeup(t->schedule); } else { SILC_LOG_DEBUG(("Run completion directly")); t->completion(NULL, NULL, 0, 0, t->completion_context); } } silc_mutex_lock(lock); /* Check if there are calls in queue */ if (silc_list_count(tp->queue) > 0) { silc_list_start(tp->queue); q = silc_list_get(tp->queue); SILC_LOG_DEBUG(("Execute call from queue")); /* Execute this call now */ t->run = q->run; t->run_context = q->run_context; t->completion = q->completion; t->completion_context = q->completion_context; t->schedule = q->schedule; silc_list_del(tp->queue, q); silc_list_add(tp->free_queue, q); continue; } /* The thread is now free for use again. */ t->run = NULL; t->completion = NULL; t->schedule = NULL; silc_list_add(tp->free_threads, t); } return NULL; } /* Creates new thread to thread pool */ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp) { SilcThreadPoolThread t; t = silc_scalloc(tp->stack, 1, sizeof(*t)); if (!t) return NULL; t->tp = tp; silc_list_add(tp->threads, t); silc_list_add(tp->free_threads, t); silc_thread_pool_ref(tp); SILC_LOG_DEBUG(("Start thread %p", t)); /* Start the thread */ silc_thread_create(silc_thread_pool_run_thread, t, FALSE); return t; } /**************************** Thread Pool API *******************************/ /* Allocate thread pool */ SilcThreadPool silc_thread_pool_alloc(SilcStack stack, SilcUInt32 min_threads, SilcUInt32 max_threads, SilcBool start_min_threads) { SilcThreadPool tp; int i; if (max_threads < min_threads) return NULL; if (stack) stack = silc_stack_alloc(0, stack); tp = silc_scalloc(stack, 1, sizeof(*tp)); if (!tp) { silc_stack_free(stack); return NULL; } SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d", tp, min_threads, max_threads)); tp->stack = stack; tp->min_threads = min_threads; tp->max_threads = max_threads; tp->refcnt++; if (!silc_mutex_alloc(&tp->lock)) { silc_sfree(stack, tp); silc_stack_free(stack); return NULL; } if (!silc_cond_alloc(&tp->pool_signal)) { silc_mutex_free(tp->lock); silc_sfree(stack, tp); silc_stack_free(stack); return NULL; } silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next); silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2); silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next); silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next); for (i = 0; i < tp->min_threads && start_min_threads; i++) silc_thread_pool_new_thread(tp); return tp; } /* Free thread pool */ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished) { SilcThreadPoolThread t; SILC_LOG_DEBUG(("Free thread pool %p", tp)); silc_mutex_lock(tp->lock); tp->destroy = TRUE; /* Stop threads */ silc_list_start(tp->threads); while ((t = silc_list_get(tp->threads))) t->stop = TRUE; silc_cond_broadcast(tp->pool_signal); if (wait_unfinished) { SILC_LOG_DEBUG(("Wait threads to finish")); while (silc_list_count(tp->threads)) silc_cond_wait(tp->pool_signal, tp->lock); } /* Free calls from queue */ silc_list_start(tp->queue); while ((t = silc_list_get(tp->queue))) silc_sfree(tp->stack, t); silc_list_start(tp->free_queue); while ((t = silc_list_get(tp->free_queue))) silc_sfree(tp->stack, t); silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next); silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next); /* Release reference. Releases lock also. */ silc_thread_pool_unref(tp); } /* Execute code in a thread in the pool */ SilcBool silc_thread_pool_run(SilcThreadPool tp, SilcBool queuable, SilcSchedule schedule, SilcThreadPoolFunc run, void *run_context, SilcTaskCallback completion, void *completion_context) { SilcThreadPoolThread t; silc_mutex_lock(tp->lock); if (tp->destroy) { silc_mutex_unlock(tp->lock); return FALSE; } /* Get free thread */ silc_list_start(tp->free_threads); t = silc_list_get(tp->free_threads); if (!t) { if (silc_list_count(tp->threads) + 1 > tp->max_threads) { /* Maximum threads reached */ if (!queuable) { silc_mutex_unlock(tp->lock); return FALSE; } SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context)); /* User wants to queue this call until thread becomes free */ silc_list_start(tp->free_queue); t = silc_list_get(tp->free_queue); if (!t) { t = silc_scalloc(tp->stack, 1, sizeof(*t)); if (!t) { silc_mutex_unlock(tp->lock); return FALSE; } } else { silc_list_del(tp->free_queue, t); } t->run = run; t->run_context = run_context; t->completion = completion; t->completion_context = completion_context; t->schedule = schedule; silc_list_add(tp->queue, t); silc_mutex_unlock(tp->lock); return TRUE; } else { /* Create new thread */ t = silc_thread_pool_new_thread(tp); if (!t) { silc_mutex_unlock(tp->lock); return FALSE; } } } SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t)); /* Mark this call to be executed in this thread */ t->run = run; t->run_context = run_context; t->completion = completion; t->completion_context = completion_context; t->schedule = schedule; silc_list_del(tp->free_threads, t); /* Signal threads */ silc_cond_broadcast(tp->pool_signal); silc_mutex_unlock(tp->lock); return TRUE; } /* Set maximum threads in the pool */ void silc_thread_pool_set_max_threads(SilcThreadPool tp, SilcUInt32 max_threads) { SILC_LOG_DEBUG(("Set thread pool %p max threads to %d", tp, max_threads)); silc_mutex_lock(tp->lock); tp->max_threads = max_threads; silc_mutex_unlock(tp->lock); } /* Get maximum threads in the pool */ SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp) { SilcUInt32 max_threads; silc_mutex_lock(tp->lock); max_threads = tp->max_threads; silc_mutex_unlock(tp->lock); return max_threads; } /* Get numnber of free threads in the pool */ SilcUInt32 silc_thread_pool_num_free_threads(SilcThreadPool tp) { SilcUInt32 free_threads; silc_mutex_lock(tp->lock); free_threads = silc_list_count(tp->free_threads); silc_mutex_unlock(tp->lock); return free_threads; } /* Purge pool */ void silc_thread_pool_purge(SilcThreadPool tp) { SilcThreadPoolThread t; int i; silc_mutex_lock(tp->lock); if (silc_list_count(tp->free_threads) <= tp->min_threads) { SILC_LOG_DEBUG(("No threads to purge")); silc_mutex_unlock(tp->lock); return; } i = silc_list_count(tp->free_threads) - tp->min_threads; SILC_LOG_DEBUG(("Purge %d threads", i)); silc_list_start(tp->threads); while ((t = silc_list_get(tp->threads))) { if (t->run) continue; t->stop = TRUE; silc_list_del(tp->free_threads, t); i--; if (!i) break; } /* Signal threads to stop */ silc_cond_broadcast(tp->pool_signal); silc_mutex_unlock(tp->lock); }