From: Pekka Riikonen Date: Thu, 26 Jul 2007 15:25:21 +0000 (+0000) Subject: Optimizations to thread pool. Changed call queues thread X-Git-Tag: 1.2.beta1~149 X-Git-Url: http://git.silcnet.org/gitweb/?p=crypto.git;a=commitdiff_plain;h=71c2c679c95ba07dc16517a280225aaf902f11a5 Optimizations to thread pool. Changed call queues thread specific and calls and executed in last-in-first-out order. --- diff --git a/lib/silcutil/silcthread.c b/lib/silcutil/silcthread.c index d7c59690..3a89206b 100644 --- a/lib/silcutil/silcthread.c +++ b/lib/silcutil/silcthread.c @@ -19,16 +19,41 @@ #include "silc.h" +/* Explanation of the thread pool execution. + + When new call is added to thread pool by calling silc_thread_pool_run + it is assigned to a first free thread from the free list. If no threads + are available we take one from the threads list and assign the call to + its queue. The threads list always takes different thread finally wrapping + from the beginning. This way each thread will get a chance to execute + queued calls. + + The thread function silc_thread_pool_run_thread executes each call. After + executing the current call that has been assigned to it, it will check + if there are any queued calls in its queue, and it will execute all calls + from the queue. If there aren't any calls in the queue, it will attempt + to steal a call from some other thread and execute it. + + The queue list is always consumed in last-in-first-out order. The most + recently added call gets priority. With full utilization this helps to + avoid CPU cache misses. Since the queues are thread specific with full + utilization each thread should always be doing work for the most recent + (and thus most important) calls. */ + /************************** Types and definitions ***************************/ -/* Thread pool thread context */ +/* Thread pool thread context. Each thread contains the most current call + to be executed, and a list of queued calls. */ typedef struct SilcThreadPoolThreadStruct { struct SilcThreadPoolThreadStruct *next; struct SilcThreadPoolThreadStruct *next2; SilcThreadPool tp; /* The thread pool */ - SilcSchedule schedule; /* Scheduler, may be NULL */ - SilcThreadPoolFunc run; /* The function to run in a thread */ - SilcTaskCallback completion; /* Completion function */ + SilcMutex lock; /* Thread lock */ + SilcList queue; /* Queue for waiting calls */ + SilcList free_queue; /* Queue freelist */ + SilcSchedule schedule; /* The current Scheduler, may be NULL */ + SilcThreadPoolFunc run; /* The current call to run in a thread */ + SilcTaskCallback completion; /* The current Completion function */ void *run_context; void *completion_context; unsigned int stop : 1; /* Set to stop the thread */ @@ -41,8 +66,6 @@ struct SilcThreadPoolStruct { SilcCond pool_signal; /* Condition variable for signalling */ SilcList threads; /* Threads in the pool */ SilcList free_threads; /* Threads freelist */ - SilcList queue; /* Queue for waiting calls */ - SilcList free_queue; /* Queue freelist */ SilcUInt16 min_threads; /* Minimum threads in the pool */ SilcUInt16 max_threads; /* Maximum threads in the pool */ SilcUInt16 refcnt; /* Reference counter */ @@ -84,7 +107,7 @@ static void silc_thread_pool_unref(SilcThreadPool tp) static void *silc_thread_pool_run_thread(void *context) { - SilcThreadPoolThread t = context, q; + SilcThreadPoolThread t = context, o, q; SilcThreadPool tp = t->tp; SilcMutex lock = tp->lock; SilcCond pool_signal = tp->pool_signal; @@ -96,15 +119,26 @@ static void *silc_thread_pool_run_thread(void *context) while (!t->run && !t->stop) silc_cond_wait(pool_signal, lock); - if (t->stop) { + if (silc_unlikely(t->stop)) { /* Stop the thread. Remove from threads list and free memory. */ SILC_LOG_DEBUG(("Stop thread %p", t)); silc_list_del(tp->threads, t); + silc_list_start(tp->threads); + + /* Clear thread's call queue. */ + silc_list_start(t->queue); + silc_list_start(t->free_queue); + while ((q = silc_list_get(t->queue))) + silc_sfree(tp->stack, q); + while ((q = silc_list_get(t->free_queue))) + silc_sfree(tp->stack, q); + + silc_mutex_free(t->lock); silc_sfree(tp->stack, t); /* If we are last thread, signal the waiting destructor. */ if (silc_list_count(tp->threads) == 0) - silc_cond_broadcast(pool_signal); + silc_cond_signal(pool_signal); /* Release pool reference. Releases lock also. */ silc_thread_pool_unref(tp); @@ -113,6 +147,7 @@ static void *silc_thread_pool_run_thread(void *context) silc_mutex_unlock(lock); /* Execute code */ + execute: SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run, t->run_context, t)); t->run(t->schedule, t->run_context); @@ -135,12 +170,12 @@ static void *silc_thread_pool_run_thread(void *context) } } - silc_mutex_lock(lock); - - /* Check if there are calls in queue */ - if (silc_list_count(tp->queue) > 0) { - silc_list_start(tp->queue); - q = silc_list_get(tp->queue); + /* Check if there are calls in queue. Takes the most recently added + call since new ones are added at the start of the list. */ + silc_mutex_lock(t->lock); + if (silc_list_count(t->queue) > 0) { + silc_list_start(t->queue); + q = silc_list_get(t->queue); SILC_LOG_DEBUG(("Execute call from queue")); @@ -151,15 +186,53 @@ static void *silc_thread_pool_run_thread(void *context) t->completion_context = q->completion_context; t->schedule = q->schedule; - silc_list_del(tp->queue, q); - silc_list_add(tp->free_queue, q); - continue; + silc_list_del(t->queue, q); + silc_list_add(t->free_queue, q); + silc_mutex_unlock(t->lock); + goto execute; + } else { + silc_mutex_unlock(t->lock); + + /* Nothing to do. Attempt to steal call from some other thread. */ + silc_mutex_lock(lock); + o = silc_list_get(tp->threads); + if (!o) { + /* List wraps around */ + silc_list_start(tp->threads); + o = silc_list_get(tp->threads); + } + silc_mutex_unlock(lock); + + if (o && o != t) { + silc_mutex_lock(o->lock); + if (silc_list_count(o->queue) > 0) { + silc_list_start(o->queue); + q = silc_list_get(o->queue); + + SILC_LOG_DEBUG(("Execute call from queue from thread %p", o)); + + /* Execute this call now */ + t->run = q->run; + t->run_context = q->run_context; + t->completion = q->completion; + t->completion_context = q->completion_context; + t->schedule = q->schedule; + + silc_list_del(o->queue, q); + silc_list_add(o->free_queue, q); + silc_mutex_unlock(o->lock); + goto execute; + } + silc_mutex_unlock(o->lock); + } } /* The thread is now free for use again. */ t->run = NULL; t->completion = NULL; t->schedule = NULL; + + silc_mutex_lock(lock); silc_list_add(tp->free_threads, t); } @@ -175,7 +248,17 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp) t = silc_scalloc(tp->stack, 1, sizeof(*t)); if (!t) return NULL; + + if (!silc_mutex_alloc(&t->lock)) { + silc_sfree(tp->stack, t); + return NULL; + } + t->tp = tp; + silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next); + silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next); + + /* Add to thread pool */ silc_list_add(tp->threads, t); silc_list_add(tp->free_threads, t); silc_thread_pool_ref(tp); @@ -202,6 +285,8 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack, if (max_threads < min_threads) return NULL; + if (!max_threads) + return NULL; if (stack) stack = silc_stack_alloc(0, stack); @@ -235,12 +320,12 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack, silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next); silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2); - silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next); - silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next); for (i = 0; i < tp->min_threads && start_min_threads; i++) silc_thread_pool_new_thread(tp); + silc_list_start(tp->threads); + return tp; } @@ -267,16 +352,6 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished) silc_cond_wait(tp->pool_signal, tp->lock); } - /* Free calls from queue */ - silc_list_start(tp->queue); - while ((t = silc_list_get(tp->queue))) - silc_sfree(tp->stack, t); - silc_list_start(tp->free_queue); - while ((t = silc_list_get(tp->free_queue))) - silc_sfree(tp->stack, t); - silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next); - silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next); - /* Release reference. Releases lock also. */ silc_thread_pool_unref(tp); } @@ -291,7 +366,7 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, SilcTaskCallback completion, void *completion_context) { - SilcThreadPoolThread t; + SilcThreadPoolThread t, q; silc_mutex_lock(tp->lock); @@ -311,28 +386,45 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, return FALSE; } - SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context)); - - /* User wants to queue this call until thread becomes free */ - silc_list_start(tp->free_queue); - t = silc_list_get(tp->free_queue); + /* User wants to queue this call until thread becomes free. Get + a thread to assign this call. */ + t = silc_list_get(tp->threads); if (!t) { - t = silc_scalloc(tp->stack, 1, sizeof(*t)); - if (!t) { + /* List wraps around */ + silc_list_start(tp->threads); + t = silc_list_get(tp->threads); + } + + SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p", + run, run_context, t)); + + /* Lock the thread. Keep also thread pool lock so that this thread + cannot become free while we're adding call to its queue. */ + silc_mutex_lock(t->lock); + + /* Get free call context from the list */ + silc_list_start(t->free_queue); + q = silc_list_get(t->free_queue); + if (!q) { + q = silc_scalloc(tp->stack, 1, sizeof(*q)); + if (!q) { + silc_mutex_unlock(t->lock); silc_mutex_unlock(tp->lock); return FALSE; } } else { - silc_list_del(tp->free_queue, t); + silc_list_del(t->free_queue, q); } - t->run = run; - t->run_context = run_context; - t->completion = completion; - t->completion_context = completion_context; - t->schedule = schedule; + q->run = run; + q->run_context = run_context; + q->completion = completion; + q->completion_context = completion_context; + q->schedule = schedule; - silc_list_add(tp->queue, t); + /* Add at the start of the list. It gets executed first. */ + silc_list_insert(t->queue, NULL, q); + silc_mutex_unlock(t->lock); silc_mutex_unlock(tp->lock); return TRUE; } else { @@ -421,10 +513,15 @@ void silc_thread_pool_purge(SilcThreadPool tp) silc_list_start(tp->threads); while ((t = silc_list_get(tp->threads))) { - if (t->run) + silc_mutex_lock(t->lock); + if (t->run) { + silc_mutex_unlock(t->lock); continue; + } t->stop = TRUE; + silc_mutex_unlock(t->lock); + silc_list_del(tp->free_threads, t); i--; @@ -435,5 +532,6 @@ void silc_thread_pool_purge(SilcThreadPool tp) /* Signal threads to stop */ silc_cond_broadcast(tp->pool_signal); + silc_list_start(tp->threads); silc_mutex_unlock(tp->lock); } diff --git a/lib/silcutil/silcutil.h b/lib/silcutil/silcutil.h index 300c0a43..e62e768d 100644 --- a/lib/silcutil/silcutil.h +++ b/lib/silcutil/silcutil.h @@ -156,8 +156,7 @@ char *silc_format(char *fmt, ...); * * Basic has function to hash strings. May be used with the SilcHashTable. * Note that this lowers the characters of the string (with tolower()) so - * this is used usually with nicknames, channel and server names to provide - * case insensitive keys. + * this can be used to provide case-insensitive hashing. * ***/ SilcUInt32 silc_hash_string(void *key, void *user_context); @@ -251,8 +250,8 @@ SilcUInt32 silc_hash_data(void *key, void *user_context); * * DESCRIPTION * - * Compares two strings. It may be used as SilcHashTable comparison - * function. + * Compares two strings. This ignores the case while comparing. It may + * be used as SilcHashTable comparison function. * ***/ SilcBool silc_hash_string_compare(void *key1, void *key2, void *user_context); diff --git a/lib/silcutil/tests/test_silcthread.c b/lib/silcutil/tests/test_silcthread.c index 6305d7c1..d59ed4e0 100644 --- a/lib/silcutil/tests/test_silcthread.c +++ b/lib/silcutil/tests/test_silcthread.c @@ -46,13 +46,13 @@ int main(int argc, char **argv) tp = silc_thread_pool_alloc(NULL, 0, 2, FALSE); if (!tp) goto err; - for (i = 0; i < 4; i++) { + for (i = 0; i < 6; i++) { SILC_LOG_DEBUG(("Run thread %d", i + 1)); if (!silc_thread_pool_run(tp, TRUE, NULL, func, (void *) i + 1, compl, (void *)i + 1)) goto err; } - sleep(4); + sleep(6); SILC_LOG_DEBUG(("Stop thread pool")); silc_thread_pool_free(tp, TRUE);