X-Git-Url: http://git.silcnet.org/gitweb/?a=blobdiff_plain;f=lib%2Fsilcutil%2Fsilcthread.c;h=8e51a15d13ef4c182382cf029f3b0f41512c3deb;hb=e7b6c157b80152bf9fb9266e6bdd93f9fb0db776;hp=3a89206b0ededf9c4a01005f62c232bf4516cf6f;hpb=71c2c679c95ba07dc16517a280225aaf902f11a5;p=silc.git diff --git a/lib/silcutil/silcthread.c b/lib/silcutil/silcthread.c index 3a89206b..8e51a15d 100644 --- a/lib/silcutil/silcthread.c +++ b/lib/silcutil/silcthread.c @@ -19,6 +19,8 @@ #include "silc.h" +/***************************** Thread Pool API *****************************/ + /* Explanation of the thread pool execution. When new call is added to thread pool by calling silc_thread_pool_run @@ -48,6 +50,7 @@ typedef struct SilcThreadPoolThreadStruct { struct SilcThreadPoolThreadStruct *next; struct SilcThreadPoolThreadStruct *next2; SilcThreadPool tp; /* The thread pool */ + SilcCond thread_signal; /* Condition variable for signalling */ SilcMutex lock; /* Thread lock */ SilcList queue; /* Queue for waiting calls */ SilcList free_queue; /* Queue freelist */ @@ -62,8 +65,8 @@ typedef struct SilcThreadPoolThreadStruct { /* Thread pool context */ struct SilcThreadPoolStruct { SilcStack stack; /* Stack for memory allocation */ - SilcMutex lock; /* Pool lock */ SilcCond pool_signal; /* Condition variable for signalling */ + SilcMutex lock; /* Pool lock */ SilcList threads; /* Threads in the pool */ SilcList free_threads; /* Threads freelist */ SilcUInt16 min_threads; /* Minimum threads in the pool */ @@ -109,44 +112,21 @@ static void *silc_thread_pool_run_thread(void *context) { SilcThreadPoolThread t = context, o, q; SilcThreadPool tp = t->tp; - SilcMutex lock = tp->lock; - SilcCond pool_signal = tp->pool_signal; + SilcMutex lock = t->lock; + SilcCond thread_signal = t->thread_signal; silc_mutex_lock(lock); while (1) { /* Wait here for code to execute */ while (!t->run && !t->stop) - silc_cond_wait(pool_signal, lock); - - if (silc_unlikely(t->stop)) { - /* Stop the thread. Remove from threads list and free memory. */ - SILC_LOG_DEBUG(("Stop thread %p", t)); - silc_list_del(tp->threads, t); - silc_list_start(tp->threads); - - /* Clear thread's call queue. */ - silc_list_start(t->queue); - silc_list_start(t->free_queue); - while ((q = silc_list_get(t->queue))) - silc_sfree(tp->stack, q); - while ((q = silc_list_get(t->free_queue))) - silc_sfree(tp->stack, q); - - silc_mutex_free(t->lock); - silc_sfree(tp->stack, t); + silc_cond_wait(thread_signal, lock); - /* If we are last thread, signal the waiting destructor. */ - if (silc_list_count(tp->threads) == 0) - silc_cond_signal(pool_signal); - - /* Release pool reference. Releases lock also. */ - silc_thread_pool_unref(tp); - break; - } - silc_mutex_unlock(lock); + if (t->stop) + goto stop; /* Execute code */ + silc_mutex_unlock(lock); execute: SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run, t->run_context, t)); @@ -170,10 +150,14 @@ static void *silc_thread_pool_run_thread(void *context) } } + silc_mutex_lock(lock); + if (t->stop) + goto stop; + /* Check if there are calls in queue. Takes the most recently added call since new ones are added at the start of the list. */ - silc_mutex_lock(t->lock); if (silc_list_count(t->queue) > 0) { + execute_queue: silc_list_start(t->queue); q = silc_list_get(t->queue); @@ -188,54 +172,100 @@ static void *silc_thread_pool_run_thread(void *context) silc_list_del(t->queue, q); silc_list_add(t->free_queue, q); - silc_mutex_unlock(t->lock); + silc_mutex_unlock(lock); goto execute; - } else { - silc_mutex_unlock(t->lock); + } + + silc_mutex_unlock(lock); + silc_mutex_lock(tp->lock); - /* Nothing to do. Attempt to steal call from some other thread. */ - silc_mutex_lock(lock); + /* Nothing to do. Attempt to steal call from some other thread. */ + o = silc_list_get(tp->threads); + if (!o) { + /* List wraps around */ + silc_list_start(tp->threads); o = silc_list_get(tp->threads); - if (!o) { - /* List wraps around */ - silc_list_start(tp->threads); - o = silc_list_get(tp->threads); - } - silc_mutex_unlock(lock); + } - if (o && o != t) { - silc_mutex_lock(o->lock); - if (silc_list_count(o->queue) > 0) { - silc_list_start(o->queue); - q = silc_list_get(o->queue); - - SILC_LOG_DEBUG(("Execute call from queue from thread %p", o)); - - /* Execute this call now */ - t->run = q->run; - t->run_context = q->run_context; - t->completion = q->completion; - t->completion_context = q->completion_context; - t->schedule = q->schedule; - - silc_list_del(o->queue, q); - silc_list_add(o->free_queue, q); - silc_mutex_unlock(o->lock); - goto execute; - } - silc_mutex_unlock(o->lock); - } + /* Check that the other thread is valid and has something to execute. */ + silc_mutex_lock(o->lock); + if (o == t || o->stop || silc_list_count(o->queue) == 0) { + silc_mutex_unlock(o->lock); + o = NULL; + } + + if (o) { + silc_mutex_unlock(tp->lock); + silc_list_start(o->queue); + q = silc_list_get(o->queue); + + SILC_LOG_DEBUG(("Execute call from queue from thread %p", o)); + + /* Execute this call now */ + t->run = q->run; + t->run_context = q->run_context; + t->completion = q->completion; + t->completion_context = q->completion_context; + t->schedule = q->schedule; + + silc_list_del(o->queue, q); + silc_list_add(o->free_queue, q); + silc_mutex_unlock(o->lock); + goto execute; + } + + silc_mutex_lock(lock); + if (t->stop) { + silc_mutex_unlock(tp->lock); + goto stop; + } + + /* Now that we have the lock back, check the queue again. */ + if (silc_list_count(t->queue) > 0) { + silc_mutex_unlock(tp->lock); + goto execute_queue; } /* The thread is now free for use again. */ t->run = NULL; t->completion = NULL; t->schedule = NULL; - - silc_mutex_lock(lock); silc_list_add(tp->free_threads, t); + silc_mutex_unlock(tp->lock); } + stop: + /* Stop the thread. Remove from threads list. */ + SILC_LOG_DEBUG(("Stop thread %p", t)); + + /* We can unlock the thread now. After we get the thread pool lock + no one can retrieve the thread anymore. */ + silc_mutex_unlock(lock); + silc_mutex_lock(tp->lock); + + silc_list_del(tp->threads, t); + silc_list_start(tp->threads); + + /* Clear thread's call queue. */ + silc_list_start(t->queue); + silc_list_start(t->free_queue); + while ((q = silc_list_get(t->queue))) + silc_sfree(tp->stack, q); + while ((q = silc_list_get(t->free_queue))) + silc_sfree(tp->stack, q); + + /* Destroy the thread */ + silc_mutex_free(lock); + silc_cond_free(thread_signal); + silc_sfree(tp->stack, t); + + /* If we are last thread, signal the waiting destructor. */ + if (silc_list_count(tp->threads) == 0) + silc_cond_signal(tp->pool_signal); + + /* Release pool reference. Releases lock also. */ + silc_thread_pool_unref(tp); + return NULL; } @@ -254,6 +284,12 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp) return NULL; } + if (!silc_cond_alloc(&t->thread_signal)) { + silc_mutex_free(t->lock); + silc_sfree(tp->stack, t); + return NULL; + } + t->tp = tp; silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next); silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next); @@ -283,10 +319,16 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack, SilcThreadPool tp; int i; - if (max_threads < min_threads) + if (max_threads < min_threads) { + silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, + "Max threads is smaller than min threads (%d < %d)", + max_threads, min_threads); return NULL; - if (!max_threads) + } + if (!max_threads) { + silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0"); return NULL; + } if (stack) stack = silc_stack_alloc(0, stack); @@ -342,9 +384,12 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished) /* Stop threads */ silc_list_start(tp->threads); - while ((t = silc_list_get(tp->threads))) + while ((t = silc_list_get(tp->threads))) { + silc_mutex_lock(t->lock); t->stop = TRUE; - silc_cond_broadcast(tp->pool_signal); + silc_cond_signal(t->thread_signal); + silc_mutex_unlock(t->lock); + } if (wait_unfinished) { SILC_LOG_DEBUG(("Wait threads to finish")); @@ -372,17 +417,19 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, if (tp->destroy) { silc_mutex_unlock(tp->lock); + silc_set_errno(SILC_ERR_NOT_VALID); return FALSE; } /* Get free thread */ silc_list_start(tp->free_threads); t = silc_list_get(tp->free_threads); - if (!t) { + if (!t || t->stop) { if (silc_list_count(tp->threads) + 1 > tp->max_threads) { /* Maximum threads reached */ if (!queuable) { silc_mutex_unlock(tp->lock); + silc_set_errno(SILC_ERR_LIMIT); return FALSE; } @@ -394,12 +441,11 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, silc_list_start(tp->threads); t = silc_list_get(tp->threads); } + silc_mutex_unlock(tp->lock); SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p", run, run_context, t)); - /* Lock the thread. Keep also thread pool lock so that this thread - cannot become free while we're adding call to its queue. */ silc_mutex_lock(t->lock); /* Get free call context from the list */ @@ -409,7 +455,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, q = silc_scalloc(tp->stack, 1, sizeof(*q)); if (!q) { silc_mutex_unlock(t->lock); - silc_mutex_unlock(tp->lock); return FALSE; } } else { @@ -425,7 +470,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, /* Add at the start of the list. It gets executed first. */ silc_list_insert(t->queue, NULL, q); silc_mutex_unlock(t->lock); - silc_mutex_unlock(tp->lock); return TRUE; } else { /* Create new thread */ @@ -437,20 +481,24 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, } } + silc_list_del(tp->free_threads, t); + silc_mutex_unlock(tp->lock); + SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t)); + silc_mutex_lock(t->lock); + /* Mark this call to be executed in this thread */ t->run = run; t->run_context = run_context; t->completion = completion; t->completion_context = completion_context; t->schedule = schedule; - silc_list_del(tp->free_threads, t); - /* Signal threads */ - silc_cond_broadcast(tp->pool_signal); + /* Signal the thread */ + silc_cond_signal(t->thread_signal); + silc_mutex_unlock(t->lock); - silc_mutex_unlock(tp->lock); return TRUE; } @@ -519,7 +567,9 @@ void silc_thread_pool_purge(SilcThreadPool tp) continue; } + /* Signal the thread to stop */ t->stop = TRUE; + silc_cond_signal(t->thread_signal); silc_mutex_unlock(t->lock); silc_list_del(tp->free_threads, t); @@ -529,9 +579,30 @@ void silc_thread_pool_purge(SilcThreadPool tp) break; } - /* Signal threads to stop */ - silc_cond_broadcast(tp->pool_signal); - silc_list_start(tp->threads); silc_mutex_unlock(tp->lock); } + +/*************************** Thread-local Storage ***************************/ + +void silc_thread_tls_set(void *context) +{ + SilcTls tls = silc_thread_get_tls(); + + if (!tls) { + /* Initialize Tls for this thread */ + tls = silc_thread_tls_init(); + if (!tls) + return; + } + + tls->thread_context = context; +} + +void *silc_thread_tls_get(void) +{ + SilcTls tls = silc_thread_get_tls(); + if (!tls) + return NULL; + return tls->thread_context; +}