#include "silc.h"
+/***************************** Thread Pool API *****************************/
+
/* Explanation of the thread pool execution.
When new call is added to thread pool by calling silc_thread_pool_run
struct SilcThreadPoolThreadStruct *next;
struct SilcThreadPoolThreadStruct *next2;
SilcThreadPool tp; /* The thread pool */
+ SilcCond thread_signal; /* Condition variable for signalling */
SilcMutex lock; /* Thread lock */
SilcList queue; /* Queue for waiting calls */
SilcList free_queue; /* Queue freelist */
/* Thread pool context */
struct SilcThreadPoolStruct {
SilcStack stack; /* Stack for memory allocation */
- SilcMutex lock; /* Pool lock */
SilcCond pool_signal; /* Condition variable for signalling */
+ SilcMutex lock; /* Pool lock */
SilcList threads; /* Threads in the pool */
SilcList free_threads; /* Threads freelist */
SilcUInt16 min_threads; /* Minimum threads in the pool */
{
SilcThreadPoolThread t = context, o, q;
SilcThreadPool tp = t->tp;
- SilcMutex lock = tp->lock;
- SilcCond pool_signal = tp->pool_signal;
+ SilcMutex lock = t->lock;
+ SilcCond thread_signal = t->thread_signal;
silc_mutex_lock(lock);
while (1) {
/* Wait here for code to execute */
while (!t->run && !t->stop)
- silc_cond_wait(pool_signal, lock);
-
- if (silc_unlikely(t->stop)) {
- /* Stop the thread. Remove from threads list and free memory. */
- SILC_LOG_DEBUG(("Stop thread %p", t));
- silc_list_del(tp->threads, t);
- silc_list_start(tp->threads);
-
- /* Clear thread's call queue. */
- silc_list_start(t->queue);
- silc_list_start(t->free_queue);
- while ((q = silc_list_get(t->queue)))
- silc_sfree(tp->stack, q);
- while ((q = silc_list_get(t->free_queue)))
- silc_sfree(tp->stack, q);
-
- silc_mutex_free(t->lock);
- silc_sfree(tp->stack, t);
+ silc_cond_wait(thread_signal, lock);
- /* If we are last thread, signal the waiting destructor. */
- if (silc_list_count(tp->threads) == 0)
- silc_cond_signal(pool_signal);
-
- /* Release pool reference. Releases lock also. */
- silc_thread_pool_unref(tp);
- break;
- }
- silc_mutex_unlock(lock);
+ if (t->stop)
+ goto stop;
/* Execute code */
+ silc_mutex_unlock(lock);
execute:
SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
t->run_context, t));
}
}
+ silc_mutex_lock(lock);
+ if (t->stop)
+ goto stop;
+
/* Check if there are calls in queue. Takes the most recently added
call since new ones are added at the start of the list. */
- silc_mutex_lock(t->lock);
if (silc_list_count(t->queue) > 0) {
+ execute_queue:
silc_list_start(t->queue);
q = silc_list_get(t->queue);
silc_list_del(t->queue, q);
silc_list_add(t->free_queue, q);
- silc_mutex_unlock(t->lock);
+ silc_mutex_unlock(lock);
goto execute;
- } else {
- silc_mutex_unlock(t->lock);
+ }
+
+ silc_mutex_unlock(lock);
+ silc_mutex_lock(tp->lock);
- /* Nothing to do. Attempt to steal call from some other thread. */
- silc_mutex_lock(lock);
+ /* Nothing to do. Attempt to steal call from some other thread. */
+ o = silc_list_get(tp->threads);
+ if (!o) {
+ /* List wraps around */
+ silc_list_start(tp->threads);
o = silc_list_get(tp->threads);
- if (!o) {
- /* List wraps around */
- silc_list_start(tp->threads);
- o = silc_list_get(tp->threads);
- }
- silc_mutex_unlock(lock);
+ }
- if (o && o != t) {
- silc_mutex_lock(o->lock);
- if (silc_list_count(o->queue) > 0) {
- silc_list_start(o->queue);
- q = silc_list_get(o->queue);
-
- SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
-
- /* Execute this call now */
- t->run = q->run;
- t->run_context = q->run_context;
- t->completion = q->completion;
- t->completion_context = q->completion_context;
- t->schedule = q->schedule;
-
- silc_list_del(o->queue, q);
- silc_list_add(o->free_queue, q);
- silc_mutex_unlock(o->lock);
- goto execute;
- }
- silc_mutex_unlock(o->lock);
- }
+ /* Check that the other thread is valid and has something to execute. */
+ silc_mutex_lock(o->lock);
+ if (o == t || o->stop || silc_list_count(o->queue) == 0) {
+ silc_mutex_unlock(o->lock);
+ o = NULL;
+ }
+
+ if (o) {
+ silc_mutex_unlock(tp->lock);
+ silc_list_start(o->queue);
+ q = silc_list_get(o->queue);
+
+ SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
+
+ /* Execute this call now */
+ t->run = q->run;
+ t->run_context = q->run_context;
+ t->completion = q->completion;
+ t->completion_context = q->completion_context;
+ t->schedule = q->schedule;
+
+ silc_list_del(o->queue, q);
+ silc_list_add(o->free_queue, q);
+ silc_mutex_unlock(o->lock);
+ goto execute;
+ }
+
+ silc_mutex_lock(lock);
+ if (t->stop) {
+ silc_mutex_unlock(tp->lock);
+ goto stop;
+ }
+
+ /* Now that we have the lock back, check the queue again. */
+ if (silc_list_count(t->queue) > 0) {
+ silc_mutex_unlock(tp->lock);
+ goto execute_queue;
}
/* The thread is now free for use again. */
t->run = NULL;
t->completion = NULL;
t->schedule = NULL;
-
- silc_mutex_lock(lock);
silc_list_add(tp->free_threads, t);
+ silc_mutex_unlock(tp->lock);
}
+ stop:
+ /* Stop the thread. Remove from threads list. */
+ SILC_LOG_DEBUG(("Stop thread %p", t));
+
+ /* We can unlock the thread now. After we get the thread pool lock
+ no one can retrieve the thread anymore. */
+ silc_mutex_unlock(lock);
+ silc_mutex_lock(tp->lock);
+
+ silc_list_del(tp->threads, t);
+ silc_list_start(tp->threads);
+
+ /* Clear thread's call queue. */
+ silc_list_start(t->queue);
+ silc_list_start(t->free_queue);
+ while ((q = silc_list_get(t->queue)))
+ silc_sfree(tp->stack, q);
+ while ((q = silc_list_get(t->free_queue)))
+ silc_sfree(tp->stack, q);
+
+ /* Destroy the thread */
+ silc_mutex_free(lock);
+ silc_cond_free(thread_signal);
+ silc_sfree(tp->stack, t);
+
+ /* If we are last thread, signal the waiting destructor. */
+ if (silc_list_count(tp->threads) == 0)
+ silc_cond_signal(tp->pool_signal);
+
+ /* Release pool reference. Releases lock also. */
+ silc_thread_pool_unref(tp);
+
return NULL;
}
return NULL;
}
+ if (!silc_cond_alloc(&t->thread_signal)) {
+ silc_mutex_free(t->lock);
+ silc_sfree(tp->stack, t);
+ return NULL;
+ }
+
t->tp = tp;
silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
SilcThreadPool tp;
int i;
- if (max_threads < min_threads)
+ if (max_threads < min_threads) {
+ silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT,
+ "Max threads is smaller than min threads (%d < %d)",
+ max_threads, min_threads);
return NULL;
- if (!max_threads)
+ }
+ if (!max_threads) {
+ silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0");
return NULL;
+ }
if (stack)
stack = silc_stack_alloc(0, stack);
/* Stop threads */
silc_list_start(tp->threads);
- while ((t = silc_list_get(tp->threads)))
+ while ((t = silc_list_get(tp->threads))) {
+ silc_mutex_lock(t->lock);
t->stop = TRUE;
- silc_cond_broadcast(tp->pool_signal);
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
+ }
if (wait_unfinished) {
SILC_LOG_DEBUG(("Wait threads to finish"));
if (tp->destroy) {
silc_mutex_unlock(tp->lock);
+ silc_set_errno(SILC_ERR_NOT_VALID);
return FALSE;
}
/* Get free thread */
silc_list_start(tp->free_threads);
t = silc_list_get(tp->free_threads);
- if (!t) {
+ if (!t || t->stop) {
if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
/* Maximum threads reached */
if (!queuable) {
silc_mutex_unlock(tp->lock);
+ silc_set_errno(SILC_ERR_LIMIT);
return FALSE;
}
silc_list_start(tp->threads);
t = silc_list_get(tp->threads);
}
+ silc_mutex_unlock(tp->lock);
SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
run, run_context, t));
- /* Lock the thread. Keep also thread pool lock so that this thread
- cannot become free while we're adding call to its queue. */
silc_mutex_lock(t->lock);
/* Get free call context from the list */
q = silc_scalloc(tp->stack, 1, sizeof(*q));
if (!q) {
silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return FALSE;
}
} else {
/* Add at the start of the list. It gets executed first. */
silc_list_insert(t->queue, NULL, q);
silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return TRUE;
} else {
/* Create new thread */
}
}
+ silc_list_del(tp->free_threads, t);
+ silc_mutex_unlock(tp->lock);
+
SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
+ silc_mutex_lock(t->lock);
+
/* Mark this call to be executed in this thread */
t->run = run;
t->run_context = run_context;
t->completion = completion;
t->completion_context = completion_context;
t->schedule = schedule;
- silc_list_del(tp->free_threads, t);
- /* Signal threads */
- silc_cond_broadcast(tp->pool_signal);
+ /* Signal the thread */
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return TRUE;
}
continue;
}
+ /* Signal the thread to stop */
t->stop = TRUE;
+ silc_cond_signal(t->thread_signal);
silc_mutex_unlock(t->lock);
silc_list_del(tp->free_threads, t);
break;
}
- /* Signal threads to stop */
- silc_cond_broadcast(tp->pool_signal);
-
silc_list_start(tp->threads);
silc_mutex_unlock(tp->lock);
}
+
+/*************************** Thread-local Storage ***************************/
+
+void silc_thread_tls_set(void *context)
+{
+ SilcTls tls = silc_thread_get_tls();
+
+ if (!tls) {
+ /* Initialize Tls for this thread */
+ tls = silc_thread_tls_init();
+ if (!tls)
+ return;
+ }
+
+ tls->thread_context = context;
+}
+
+void *silc_thread_tls_get(void)
+{
+ SilcTls tls = silc_thread_get_tls();
+ if (!tls)
+ return NULL;
+ return tls->thread_context;
+}