From 084e7bacdcfadae09af57905625d5aaed63832f3 Mon Sep 17 00:00:00 2001 From: Pekka Riikonen Date: Thu, 26 Jul 2007 18:18:41 +0000 Subject: [PATCH] Changed condition variables thread specific. --- lib/silcutil/silcthread.c | 73 ++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/lib/silcutil/silcthread.c b/lib/silcutil/silcthread.c index 3a89206b..ad53a421 100644 --- a/lib/silcutil/silcthread.c +++ b/lib/silcutil/silcthread.c @@ -48,6 +48,7 @@ typedef struct SilcThreadPoolThreadStruct { struct SilcThreadPoolThreadStruct *next; struct SilcThreadPoolThreadStruct *next2; SilcThreadPool tp; /* The thread pool */ + SilcCond thread_signal; /* Condition variable for signalling */ SilcMutex lock; /* Thread lock */ SilcList queue; /* Queue for waiting calls */ SilcList free_queue; /* Queue freelist */ @@ -62,8 +63,8 @@ typedef struct SilcThreadPoolThreadStruct { /* Thread pool context */ struct SilcThreadPoolStruct { SilcStack stack; /* Stack for memory allocation */ - SilcMutex lock; /* Pool lock */ SilcCond pool_signal; /* Condition variable for signalling */ + SilcMutex lock; /* Pool lock */ SilcList threads; /* Threads in the pool */ SilcList free_threads; /* Threads freelist */ SilcUInt16 min_threads; /* Minimum threads in the pool */ @@ -109,19 +110,20 @@ static void *silc_thread_pool_run_thread(void *context) { SilcThreadPoolThread t = context, o, q; SilcThreadPool tp = t->tp; - SilcMutex lock = tp->lock; - SilcCond pool_signal = tp->pool_signal; + SilcMutex lock = t->lock; + SilcCond thread_signal = t->thread_signal; silc_mutex_lock(lock); while (1) { /* Wait here for code to execute */ while (!t->run && !t->stop) - silc_cond_wait(pool_signal, lock); + silc_cond_wait(thread_signal, lock); if (silc_unlikely(t->stop)) { - /* Stop the thread. Remove from threads list and free memory. */ + /* Stop the thread. Remove from threads list. */ SILC_LOG_DEBUG(("Stop thread %p", t)); + silc_mutex_lock(tp->lock); silc_list_del(tp->threads, t); silc_list_start(tp->threads); @@ -133,12 +135,15 @@ static void *silc_thread_pool_run_thread(void *context) while ((q = silc_list_get(t->free_queue))) silc_sfree(tp->stack, q); - silc_mutex_free(t->lock); + /* Destroy the thread */ + silc_mutex_unlock(lock); + silc_mutex_free(lock); + silc_cond_free(thread_signal); silc_sfree(tp->stack, t); /* If we are last thread, signal the waiting destructor. */ if (silc_list_count(tp->threads) == 0) - silc_cond_signal(pool_signal); + silc_cond_signal(tp->pool_signal); /* Release pool reference. Releases lock also. */ silc_thread_pool_unref(tp); @@ -172,8 +177,9 @@ static void *silc_thread_pool_run_thread(void *context) /* Check if there are calls in queue. Takes the most recently added call since new ones are added at the start of the list. */ - silc_mutex_lock(t->lock); + silc_mutex_lock(lock); if (silc_list_count(t->queue) > 0) { + execute_queue: silc_list_start(t->queue); q = silc_list_get(t->queue); @@ -188,20 +194,20 @@ static void *silc_thread_pool_run_thread(void *context) silc_list_del(t->queue, q); silc_list_add(t->free_queue, q); - silc_mutex_unlock(t->lock); + silc_mutex_unlock(lock); goto execute; } else { - silc_mutex_unlock(t->lock); + silc_mutex_unlock(lock); /* Nothing to do. Attempt to steal call from some other thread. */ - silc_mutex_lock(lock); + silc_mutex_lock(tp->lock); o = silc_list_get(tp->threads); if (!o) { /* List wraps around */ silc_list_start(tp->threads); o = silc_list_get(tp->threads); } - silc_mutex_unlock(lock); + silc_mutex_unlock(tp->lock); if (o && o != t) { silc_mutex_lock(o->lock); @@ -227,12 +233,16 @@ static void *silc_thread_pool_run_thread(void *context) } } + silc_mutex_lock(lock); + + /* Now that we have the lock back, check the queue again. */ + if (silc_list_count(t->queue) > 0) + goto execute_queue; + /* The thread is now free for use again. */ t->run = NULL; t->completion = NULL; t->schedule = NULL; - - silc_mutex_lock(lock); silc_list_add(tp->free_threads, t); } @@ -254,6 +264,12 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp) return NULL; } + if (!silc_cond_alloc(&t->thread_signal)) { + silc_mutex_free(t->lock); + silc_sfree(tp->stack, t); + return NULL; + } + t->tp = tp; silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next); silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next); @@ -342,9 +358,12 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished) /* Stop threads */ silc_list_start(tp->threads); - while ((t = silc_list_get(tp->threads))) + while ((t = silc_list_get(tp->threads))) { + silc_mutex_lock(t->lock); t->stop = TRUE; - silc_cond_broadcast(tp->pool_signal); + silc_cond_signal(t->thread_signal); + silc_mutex_unlock(t->lock); + } if (wait_unfinished) { SILC_LOG_DEBUG(("Wait threads to finish")); @@ -394,12 +413,11 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, silc_list_start(tp->threads); t = silc_list_get(tp->threads); } + silc_mutex_unlock(tp->lock); SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p", run, run_context, t)); - /* Lock the thread. Keep also thread pool lock so that this thread - cannot become free while we're adding call to its queue. */ silc_mutex_lock(t->lock); /* Get free call context from the list */ @@ -409,7 +427,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, q = silc_scalloc(tp->stack, 1, sizeof(*q)); if (!q) { silc_mutex_unlock(t->lock); - silc_mutex_unlock(tp->lock); return FALSE; } } else { @@ -425,7 +442,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, /* Add at the start of the list. It gets executed first. */ silc_list_insert(t->queue, NULL, q); silc_mutex_unlock(t->lock); - silc_mutex_unlock(tp->lock); return TRUE; } else { /* Create new thread */ @@ -437,20 +453,24 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, } } + silc_list_del(tp->free_threads, t); + silc_mutex_unlock(tp->lock); + SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t)); + silc_mutex_lock(t->lock); + /* Mark this call to be executed in this thread */ t->run = run; t->run_context = run_context; t->completion = completion; t->completion_context = completion_context; t->schedule = schedule; - silc_list_del(tp->free_threads, t); - /* Signal threads */ - silc_cond_broadcast(tp->pool_signal); + /* Signal the thread */ + silc_cond_signal(t->thread_signal); + silc_mutex_unlock(t->lock); - silc_mutex_unlock(tp->lock); return TRUE; } @@ -519,7 +539,9 @@ void silc_thread_pool_purge(SilcThreadPool tp) continue; } + /* Signal the thread to stop */ t->stop = TRUE; + silc_cond_signal(t->thread_signal); silc_mutex_unlock(t->lock); silc_list_del(tp->free_threads, t); @@ -529,9 +551,6 @@ void silc_thread_pool_purge(SilcThreadPool tp) break; } - /* Signal threads to stop */ - silc_cond_broadcast(tp->pool_signal); - silc_list_start(tp->threads); silc_mutex_unlock(tp->lock); } -- 2.24.0