#include "silc.h"
+/***************************** Thread Pool API *****************************/
+
+/* Explanation of the thread pool execution.
+
+ When new call is added to thread pool by calling silc_thread_pool_run
+ it is assigned to a first free thread from the free list. If no threads
+ are available we take one from the threads list and assign the call to
+ its queue. The threads list always takes different thread finally wrapping
+ from the beginning. This way each thread will get a chance to execute
+ queued calls.
+
+ The thread function silc_thread_pool_run_thread executes each call. After
+ executing the current call that has been assigned to it, it will check
+ if there are any queued calls in its queue, and it will execute all calls
+ from the queue. If there aren't any calls in the queue, it will attempt
+ to steal a call from some other thread and execute it.
+
+ The queue list is always consumed in last-in-first-out order. The most
+ recently added call gets priority. With full utilization this helps to
+ avoid CPU cache misses. Since the queues are thread specific with full
+ utilization each thread should always be doing work for the most recent
+ (and thus most important) calls. */
+
/************************** Types and definitions ***************************/
-/* Thread pool thread context */
+/* Thread pool thread context. Each thread contains the most current call
+ to be executed, and a list of queued calls. */
typedef struct SilcThreadPoolThreadStruct {
struct SilcThreadPoolThreadStruct *next;
struct SilcThreadPoolThreadStruct *next2;
SilcThreadPool tp; /* The thread pool */
- SilcSchedule schedule; /* Scheduler, may be NULL */
- SilcThreadPoolFunc run; /* The function to run in a thread */
- SilcTaskCallback completion; /* Completion function */
+ SilcCond thread_signal; /* Condition variable for signalling */
+ SilcMutex lock; /* Thread lock */
+ SilcList queue; /* Queue for waiting calls */
+ SilcList free_queue; /* Queue freelist */
+ SilcSchedule schedule; /* The current Scheduler, may be NULL */
+ SilcThreadPoolFunc run; /* The current call to run in a thread */
+ SilcTaskCallback completion; /* The current Completion function */
void *run_context;
void *completion_context;
unsigned int stop : 1; /* Set to stop the thread */
/* Thread pool context */
struct SilcThreadPoolStruct {
SilcStack stack; /* Stack for memory allocation */
- SilcMutex lock; /* Pool lock */
SilcCond pool_signal; /* Condition variable for signalling */
+ SilcMutex lock; /* Pool lock */
SilcList threads; /* Threads in the pool */
SilcList free_threads; /* Threads freelist */
- SilcList queue; /* Queue for waiting calls */
SilcUInt16 min_threads; /* Minimum threads in the pool */
SilcUInt16 max_threads; /* Maximum threads in the pool */
SilcUInt16 refcnt; /* Reference counter */
static void *silc_thread_pool_run_thread(void *context)
{
- SilcThreadPoolThread t = context, q;
+ SilcThreadPoolThread t = context, o, q;
SilcThreadPool tp = t->tp;
- SilcMutex lock = tp->lock;
- SilcCond pool_signal = tp->pool_signal;
+ SilcMutex lock = t->lock;
+ SilcCond thread_signal = t->thread_signal;
silc_mutex_lock(lock);
while (1) {
/* Wait here for code to execute */
while (!t->run && !t->stop)
- silc_cond_wait(pool_signal, lock);
-
- if (t->stop) {
- /* Stop the thread. Remove from threads list and free memory. */
- SILC_LOG_DEBUG(("Stop thread %p", t));
- silc_list_del(tp->threads, t);
- silc_sfree(tp->stack, t);
+ silc_cond_wait(thread_signal, lock);
- /* If we are last thread, signal the waiting destructor. */
- if (silc_list_count(tp->threads) == 0)
- silc_cond_broadcast(pool_signal);
-
- /* Release pool reference. Releases lock also. */
- silc_thread_pool_unref(tp);
- break;
- }
- silc_mutex_unlock(lock);
+ if (t->stop)
+ goto stop;
/* Execute code */
+ silc_mutex_unlock(lock);
+ execute:
SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
t->run_context, t));
t->run(t->schedule, t->run_context);
}
silc_mutex_lock(lock);
+ if (t->stop)
+ goto stop;
- /* Check if there are calls in queue */
- if (silc_list_count(tp->queue) > 0) {
- silc_list_start(tp->queue);
- q = silc_list_get(tp->queue);
+ /* Check if there are calls in queue. Takes the most recently added
+ call since new ones are added at the start of the list. */
+ if (silc_list_count(t->queue) > 0) {
+ execute_queue:
+ silc_list_start(t->queue);
+ q = silc_list_get(t->queue);
SILC_LOG_DEBUG(("Execute call from queue"));
t->completion_context = q->completion_context;
t->schedule = q->schedule;
- silc_list_del(tp->queue, q);
- silc_sfree(tp->stack, q);
- continue;
+ silc_list_del(t->queue, q);
+ silc_list_add(t->free_queue, q);
+ silc_mutex_unlock(lock);
+ goto execute;
+ }
+
+ silc_mutex_unlock(lock);
+ silc_mutex_lock(tp->lock);
+
+ /* Nothing to do. Attempt to steal call from some other thread. */
+ o = silc_list_get(tp->threads);
+ if (!o) {
+ /* List wraps around */
+ silc_list_start(tp->threads);
+ o = silc_list_get(tp->threads);
+ }
+
+ /* Check that the other thread is valid and has something to execute. */
+ silc_mutex_lock(o->lock);
+ if (o == t || o->stop || silc_list_count(o->queue) == 0) {
+ silc_mutex_unlock(o->lock);
+ o = NULL;
+ }
+
+ if (o) {
+ silc_mutex_unlock(tp->lock);
+ silc_list_start(o->queue);
+ q = silc_list_get(o->queue);
+
+ SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
+
+ /* Execute this call now */
+ t->run = q->run;
+ t->run_context = q->run_context;
+ t->completion = q->completion;
+ t->completion_context = q->completion_context;
+ t->schedule = q->schedule;
+
+ silc_list_del(o->queue, q);
+ silc_list_add(o->free_queue, q);
+ silc_mutex_unlock(o->lock);
+ goto execute;
+ }
+
+ silc_mutex_lock(lock);
+ if (t->stop) {
+ silc_mutex_unlock(tp->lock);
+ goto stop;
+ }
+
+ /* Now that we have the lock back, check the queue again. */
+ if (silc_list_count(t->queue) > 0) {
+ silc_mutex_unlock(tp->lock);
+ goto execute_queue;
}
/* The thread is now free for use again. */
t->completion = NULL;
t->schedule = NULL;
silc_list_add(tp->free_threads, t);
+ silc_mutex_unlock(tp->lock);
}
+ stop:
+ /* Stop the thread. Remove from threads list. */
+ SILC_LOG_DEBUG(("Stop thread %p", t));
+
+ /* We can unlock the thread now. After we get the thread pool lock
+ no one can retrieve the thread anymore. */
+ silc_mutex_unlock(lock);
+ silc_mutex_lock(tp->lock);
+
+ silc_list_del(tp->threads, t);
+ silc_list_start(tp->threads);
+
+ /* Clear thread's call queue. */
+ silc_list_start(t->queue);
+ silc_list_start(t->free_queue);
+ while ((q = silc_list_get(t->queue)))
+ silc_sfree(tp->stack, q);
+ while ((q = silc_list_get(t->free_queue)))
+ silc_sfree(tp->stack, q);
+
+ /* Destroy the thread */
+ silc_mutex_free(lock);
+ silc_cond_free(thread_signal);
+ silc_sfree(tp->stack, t);
+
+ /* If we are last thread, signal the waiting destructor. */
+ if (silc_list_count(tp->threads) == 0)
+ silc_cond_signal(tp->pool_signal);
+
+ /* Release pool reference. Releases lock also. */
+ silc_thread_pool_unref(tp);
+
return NULL;
}
t = silc_scalloc(tp->stack, 1, sizeof(*t));
if (!t)
return NULL;
+
+ if (!silc_mutex_alloc(&t->lock)) {
+ silc_sfree(tp->stack, t);
+ return NULL;
+ }
+
+ if (!silc_cond_alloc(&t->thread_signal)) {
+ silc_mutex_free(t->lock);
+ silc_sfree(tp->stack, t);
+ return NULL;
+ }
+
t->tp = tp;
+ silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
+ silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
+
+ /* Add to thread pool */
silc_list_add(tp->threads, t);
silc_list_add(tp->free_threads, t);
silc_thread_pool_ref(tp);
SilcThreadPool tp;
int i;
- if (max_threads < min_threads)
+ if (max_threads < min_threads) {
+ silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT,
+ "Max threads is smaller than min threads (%d < %d)",
+ max_threads, min_threads);
return NULL;
+ }
+ if (!max_threads) {
+ silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0");
+ return NULL;
+ }
if (stack)
stack = silc_stack_alloc(0, stack);
silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
- silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
for (i = 0; i < tp->min_threads && start_min_threads; i++)
silc_thread_pool_new_thread(tp);
+ silc_list_start(tp->threads);
+
return tp;
}
/* Stop threads */
silc_list_start(tp->threads);
- while ((t = silc_list_get(tp->threads)))
+ while ((t = silc_list_get(tp->threads))) {
+ silc_mutex_lock(t->lock);
t->stop = TRUE;
- silc_cond_broadcast(tp->pool_signal);
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
+ }
if (wait_unfinished) {
SILC_LOG_DEBUG(("Wait threads to finish"));
silc_cond_wait(tp->pool_signal, tp->lock);
}
- /* Free calls from queue */
- silc_list_start(tp->queue);
- while ((t = silc_list_get(tp->queue)))
- silc_sfree(tp->stack, t);
- silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
-
/* Release reference. Releases lock also. */
silc_thread_pool_unref(tp);
}
SilcTaskCallback completion,
void *completion_context)
{
- SilcThreadPoolThread t;
+ SilcThreadPoolThread t, q;
silc_mutex_lock(tp->lock);
if (tp->destroy) {
silc_mutex_unlock(tp->lock);
+ silc_set_errno(SILC_ERR_NOT_VALID);
return FALSE;
}
/* Get free thread */
silc_list_start(tp->free_threads);
t = silc_list_get(tp->free_threads);
- if (!t) {
+ if (!t || t->stop) {
if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
/* Maximum threads reached */
if (!queuable) {
silc_mutex_unlock(tp->lock);
+ silc_set_errno(SILC_ERR_LIMIT);
return FALSE;
}
- SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
-
- /* User wants to queue this call until thread becomes free */
- t = silc_scalloc(tp->stack, 1, sizeof(*t));
+ /* User wants to queue this call until thread becomes free. Get
+ a thread to assign this call. */
+ t = silc_list_get(tp->threads);
if (!t) {
- silc_mutex_unlock(tp->lock);
- return FALSE;
+ /* List wraps around */
+ silc_list_start(tp->threads);
+ t = silc_list_get(tp->threads);
}
+ silc_mutex_unlock(tp->lock);
- t->run = run;
- t->run_context = run_context;
- t->completion = completion;
- t->completion_context = completion_context;
- t->schedule = schedule;
+ SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
+ run, run_context, t));
- silc_list_add(tp->queue, t);
- silc_mutex_unlock(tp->lock);
+ silc_mutex_lock(t->lock);
+
+ /* Get free call context from the list */
+ silc_list_start(t->free_queue);
+ q = silc_list_get(t->free_queue);
+ if (!q) {
+ q = silc_scalloc(tp->stack, 1, sizeof(*q));
+ if (!q) {
+ silc_mutex_unlock(t->lock);
+ return FALSE;
+ }
+ } else {
+ silc_list_del(t->free_queue, q);
+ }
+
+ q->run = run;
+ q->run_context = run_context;
+ q->completion = completion;
+ q->completion_context = completion_context;
+ q->schedule = schedule;
+
+ /* Add at the start of the list. It gets executed first. */
+ silc_list_insert(t->queue, NULL, q);
+ silc_mutex_unlock(t->lock);
return TRUE;
} else {
/* Create new thread */
}
}
+ silc_list_del(tp->free_threads, t);
+ silc_mutex_unlock(tp->lock);
+
SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
+ silc_mutex_lock(t->lock);
+
/* Mark this call to be executed in this thread */
t->run = run;
t->run_context = run_context;
t->completion = completion;
t->completion_context = completion_context;
t->schedule = schedule;
- silc_list_del(tp->free_threads, t);
- /* Signal threads */
- silc_cond_broadcast(tp->pool_signal);
+ /* Signal the thread */
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return TRUE;
}
silc_list_start(tp->threads);
while ((t = silc_list_get(tp->threads))) {
- if (t->run)
+ silc_mutex_lock(t->lock);
+ if (t->run) {
+ silc_mutex_unlock(t->lock);
continue;
+ }
+ /* Signal the thread to stop */
t->stop = TRUE;
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
+
silc_list_del(tp->free_threads, t);
i--;
break;
}
- /* Signal threads to stop */
- silc_cond_broadcast(tp->pool_signal);
-
+ silc_list_start(tp->threads);
silc_mutex_unlock(tp->lock);
}
+
+/*************************** Thread-local Storage ***************************/
+
+void silc_thread_tls_set(void *context)
+{
+ SilcTls tls = silc_thread_get_tls();
+
+ if (!tls) {
+ /* Initialize Tls for this thread */
+ tls = silc_thread_tls_init();
+ if (!tls)
+ return;
+ }
+
+ tls->thread_context = context;
+}
+
+void *silc_thread_tls_get(void)
+{
+ SilcTls tls = silc_thread_get_tls();
+ if (!tls)
+ return NULL;
+ return tls->thread_context;
+}