X-Git-Url: http://git.silcnet.org/gitweb/?a=blobdiff_plain;f=lib%2Fsilcutil%2Fsilcthread.c;h=8e51a15d13ef4c182382cf029f3b0f41512c3deb;hb=e7b6c157b80152bf9fb9266e6bdd93f9fb0db776;hp=0c6d6421a4b810e39cde8d72d6f91cb89dedaf53;hpb=da102216367c1d997f60755357d759e03f87940a;p=silc.git diff --git a/lib/silcutil/silcthread.c b/lib/silcutil/silcthread.c index 0c6d6421..8e51a15d 100644 --- a/lib/silcutil/silcthread.c +++ b/lib/silcutil/silcthread.c @@ -19,36 +19,56 @@ #include "silc.h" +/***************************** Thread Pool API *****************************/ + +/* Explanation of the thread pool execution. + + When new call is added to thread pool by calling silc_thread_pool_run + it is assigned to a first free thread from the free list. If no threads + are available we take one from the threads list and assign the call to + its queue. The threads list always takes different thread finally wrapping + from the beginning. This way each thread will get a chance to execute + queued calls. + + The thread function silc_thread_pool_run_thread executes each call. After + executing the current call that has been assigned to it, it will check + if there are any queued calls in its queue, and it will execute all calls + from the queue. If there aren't any calls in the queue, it will attempt + to steal a call from some other thread and execute it. + + The queue list is always consumed in last-in-first-out order. The most + recently added call gets priority. With full utilization this helps to + avoid CPU cache misses. Since the queues are thread specific with full + utilization each thread should always be doing work for the most recent + (and thus most important) calls. */ + /************************** Types and definitions ***************************/ -/* Thread pool thread context */ +/* Thread pool thread context. Each thread contains the most current call + to be executed, and a list of queued calls. */ typedef struct SilcThreadPoolThreadStruct { struct SilcThreadPoolThreadStruct *next; struct SilcThreadPoolThreadStruct *next2; SilcThreadPool tp; /* The thread pool */ - SilcSchedule schedule; /* Scheduler, may be NULL */ - SilcThreadPoolFunc run; /* The function to run in a thread */ - SilcThreadPoolFunc completion; /* Completion function */ + SilcCond thread_signal; /* Condition variable for signalling */ + SilcMutex lock; /* Thread lock */ + SilcList queue; /* Queue for waiting calls */ + SilcList free_queue; /* Queue freelist */ + SilcSchedule schedule; /* The current Scheduler, may be NULL */ + SilcThreadPoolFunc run; /* The current call to run in a thread */ + SilcTaskCallback completion; /* The current Completion function */ void *run_context; void *completion_context; unsigned int stop : 1; /* Set to stop the thread */ } *SilcThreadPoolThread; -/* Completion context */ -typedef struct SilcThreadPoolCompletionStruct { - SilcSchedule schedule; /* Scheduler, may be NULL */ - SilcThreadPoolFunc completion; /* Completion function */ - void *completion_context; -} *SilcThreadPoolCompletion; - /* Thread pool context */ struct SilcThreadPoolStruct { SilcStack stack; /* Stack for memory allocation */ - SilcMutex lock; /* Pool lock */ SilcCond pool_signal; /* Condition variable for signalling */ + SilcMutex lock; /* Pool lock */ SilcList threads; /* Threads in the pool */ SilcList free_threads; /* Threads freelist */ - SilcList queue; /* Queue for waiting calls */ SilcUInt16 min_threads; /* Minimum threads in the pool */ SilcUInt16 max_threads; /* Maximum threads in the pool */ SilcUInt16 refcnt; /* Reference counter */ @@ -74,58 +94,40 @@ static void silc_thread_pool_unref(SilcThreadPool tp) SILC_LOG_DEBUG(("Thread pool %p refcnt %d -> %d", tp, tp->refcnt + 1, tp->refcnt)); if (!tp->refcnt) { + SilcStack stack = tp->stack; silc_mutex_unlock(tp->lock); silc_mutex_free(tp->lock); silc_cond_free(tp->pool_signal); - silc_free(tp); + silc_sfree(stack, tp); + silc_stack_free(stack); return; } silc_mutex_unlock(tp->lock); } -/* Thread completion callback */ - -SILC_TASK_CALLBACK(silc_thread_pool_run_completion) -{ - SilcThreadPoolCompletion c = context; - c->completion(c->schedule, c->completion_context); - silc_free(c); -} - /* The thread executor. Each thread in the pool is run here. They wait here for something to do which is given to them by silc_thread_pool_run. */ static void *silc_thread_pool_run_thread(void *context) { - SilcThreadPoolThread t = context, q; + SilcThreadPoolThread t = context, o, q; SilcThreadPool tp = t->tp; - SilcMutex lock = tp->lock; - SilcCond pool_signal = tp->pool_signal; + SilcMutex lock = t->lock; + SilcCond thread_signal = t->thread_signal; silc_mutex_lock(lock); while (1) { /* Wait here for code to execute */ while (!t->run && !t->stop) - silc_cond_wait(pool_signal, lock); - - if (t->stop) { - /* Stop the thread. Remove from threads list and free memory. */ - SILC_LOG_DEBUG(("Stop thread %p", t)); - silc_list_del(tp->threads, t); - silc_free(t); - - /* If we are last thread, signal the waiting destructor. */ - if (silc_list_count(tp->threads) == 0) - silc_cond_signal(pool_signal); + silc_cond_wait(thread_signal, lock); - /* Release pool reference. Releases lock also. */ - silc_thread_pool_unref(tp); - break; - } - silc_mutex_unlock(lock); + if (t->stop) + goto stop; /* Execute code */ + silc_mutex_unlock(lock); + execute: SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run, t->run_context, t)); t->run(t->schedule, t->run_context); @@ -135,31 +137,29 @@ static void *silc_thread_pool_run_thread(void *context) is running. */ if (t->completion) { if (t->schedule) { - SilcThreadPoolCompletion c = silc_calloc(1, sizeof(*c)); - if (c) { - SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule)); - c->schedule = t->schedule; - c->completion = t->completion; - c->completion_context = t->completion_context; - silc_schedule_task_add_timeout(c->schedule, - silc_thread_pool_run_completion, c, - 0, 0); - silc_schedule_wakeup(c->schedule); - } else { - t->completion(NULL, t->completion_context); + SILC_LOG_DEBUG(("Run completion through scheduler %p", t->schedule)); + if (!silc_schedule_task_add_timeout(t->schedule, t->completion, + t->completion_context, 0, 0)) { + SILC_LOG_DEBUG(("Run completion directly")); + t->completion(NULL, NULL, 0, 0, t->completion_context); } + silc_schedule_wakeup(t->schedule); } else { SILC_LOG_DEBUG(("Run completion directly")); - t->completion(NULL, t->completion_context); + t->completion(NULL, NULL, 0, 0, t->completion_context); } } silc_mutex_lock(lock); + if (t->stop) + goto stop; - /* Check if there are calls in queue */ - if (silc_list_count(tp->queue) > 0) { - silc_list_start(tp->queue); - q = silc_list_get(tp->queue); + /* Check if there are calls in queue. Takes the most recently added + call since new ones are added at the start of the list. */ + if (silc_list_count(t->queue) > 0) { + execute_queue: + silc_list_start(t->queue); + q = silc_list_get(t->queue); SILC_LOG_DEBUG(("Execute call from queue")); @@ -170,9 +170,60 @@ static void *silc_thread_pool_run_thread(void *context) t->completion_context = q->completion_context; t->schedule = q->schedule; - silc_list_del(tp->queue, q); - silc_free(q); - continue; + silc_list_del(t->queue, q); + silc_list_add(t->free_queue, q); + silc_mutex_unlock(lock); + goto execute; + } + + silc_mutex_unlock(lock); + silc_mutex_lock(tp->lock); + + /* Nothing to do. Attempt to steal call from some other thread. */ + o = silc_list_get(tp->threads); + if (!o) { + /* List wraps around */ + silc_list_start(tp->threads); + o = silc_list_get(tp->threads); + } + + /* Check that the other thread is valid and has something to execute. */ + silc_mutex_lock(o->lock); + if (o == t || o->stop || silc_list_count(o->queue) == 0) { + silc_mutex_unlock(o->lock); + o = NULL; + } + + if (o) { + silc_mutex_unlock(tp->lock); + silc_list_start(o->queue); + q = silc_list_get(o->queue); + + SILC_LOG_DEBUG(("Execute call from queue from thread %p", o)); + + /* Execute this call now */ + t->run = q->run; + t->run_context = q->run_context; + t->completion = q->completion; + t->completion_context = q->completion_context; + t->schedule = q->schedule; + + silc_list_del(o->queue, q); + silc_list_add(o->free_queue, q); + silc_mutex_unlock(o->lock); + goto execute; + } + + silc_mutex_lock(lock); + if (t->stop) { + silc_mutex_unlock(tp->lock); + goto stop; + } + + /* Now that we have the lock back, check the queue again. */ + if (silc_list_count(t->queue) > 0) { + silc_mutex_unlock(tp->lock); + goto execute_queue; } /* The thread is now free for use again. */ @@ -180,8 +231,41 @@ static void *silc_thread_pool_run_thread(void *context) t->completion = NULL; t->schedule = NULL; silc_list_add(tp->free_threads, t); + silc_mutex_unlock(tp->lock); } + stop: + /* Stop the thread. Remove from threads list. */ + SILC_LOG_DEBUG(("Stop thread %p", t)); + + /* We can unlock the thread now. After we get the thread pool lock + no one can retrieve the thread anymore. */ + silc_mutex_unlock(lock); + silc_mutex_lock(tp->lock); + + silc_list_del(tp->threads, t); + silc_list_start(tp->threads); + + /* Clear thread's call queue. */ + silc_list_start(t->queue); + silc_list_start(t->free_queue); + while ((q = silc_list_get(t->queue))) + silc_sfree(tp->stack, q); + while ((q = silc_list_get(t->free_queue))) + silc_sfree(tp->stack, q); + + /* Destroy the thread */ + silc_mutex_free(lock); + silc_cond_free(thread_signal); + silc_sfree(tp->stack, t); + + /* If we are last thread, signal the waiting destructor. */ + if (silc_list_count(tp->threads) == 0) + silc_cond_signal(tp->pool_signal); + + /* Release pool reference. Releases lock also. */ + silc_thread_pool_unref(tp); + return NULL; } @@ -191,10 +275,26 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp) { SilcThreadPoolThread t; - t = silc_calloc(1, sizeof(*t)); + t = silc_scalloc(tp->stack, 1, sizeof(*t)); if (!t) return NULL; + + if (!silc_mutex_alloc(&t->lock)) { + silc_sfree(tp->stack, t); + return NULL; + } + + if (!silc_cond_alloc(&t->thread_signal)) { + silc_mutex_free(t->lock); + silc_sfree(tp->stack, t); + return NULL; + } + t->tp = tp; + silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next); + silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next); + + /* Add to thread pool */ silc_list_add(tp->threads, t); silc_list_add(tp->free_threads, t); silc_thread_pool_ref(tp); @@ -219,12 +319,25 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack, SilcThreadPool tp; int i; - if (max_threads < min_threads) + if (max_threads < min_threads) { + silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, + "Max threads is smaller than min threads (%d < %d)", + max_threads, min_threads); + return NULL; + } + if (!max_threads) { + silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0"); return NULL; + } + + if (stack) + stack = silc_stack_alloc(0, stack); - tp = silc_calloc(1, sizeof(*tp)); - if (!tp) + tp = silc_scalloc(stack, 1, sizeof(*tp)); + if (!tp) { + silc_stack_free(stack); return NULL; + } SILC_LOG_DEBUG(("Starting thread pool %p, min threads %d, max threads %d", tp, min_threads, max_threads)); @@ -235,23 +348,26 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack, tp->refcnt++; if (!silc_mutex_alloc(&tp->lock)) { - silc_free(tp); + silc_sfree(stack, tp); + silc_stack_free(stack); return NULL; } if (!silc_cond_alloc(&tp->pool_signal)) { silc_mutex_free(tp->lock); - silc_free(tp); + silc_sfree(stack, tp); + silc_stack_free(stack); return NULL; } silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next); silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2); - silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next); for (i = 0; i < tp->min_threads && start_min_threads; i++) silc_thread_pool_new_thread(tp); + silc_list_start(tp->threads); + return tp; } @@ -268,9 +384,12 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished) /* Stop threads */ silc_list_start(tp->threads); - while ((t = silc_list_get(tp->threads))) + while ((t = silc_list_get(tp->threads))) { + silc_mutex_lock(t->lock); t->stop = TRUE; - silc_cond_signal(tp->pool_signal); + silc_cond_signal(t->thread_signal); + silc_mutex_unlock(t->lock); + } if (wait_unfinished) { SILC_LOG_DEBUG(("Wait threads to finish")); @@ -278,12 +397,6 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished) silc_cond_wait(tp->pool_signal, tp->lock); } - /* Free calls from queue */ - silc_list_start(tp->queue); - while ((t = silc_list_get(tp->queue))) - silc_free(t); - silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next); - /* Release reference. Releases lock also. */ silc_thread_pool_unref(tp); } @@ -295,46 +408,68 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, SilcSchedule schedule, SilcThreadPoolFunc run, void *run_context, - SilcThreadPoolFunc completion, + SilcTaskCallback completion, void *completion_context) { - SilcThreadPoolThread t; + SilcThreadPoolThread t, q; silc_mutex_lock(tp->lock); if (tp->destroy) { silc_mutex_unlock(tp->lock); + silc_set_errno(SILC_ERR_NOT_VALID); return FALSE; } /* Get free thread */ silc_list_start(tp->free_threads); t = silc_list_get(tp->free_threads); - if (!t) { + if (!t || t->stop) { if (silc_list_count(tp->threads) + 1 > tp->max_threads) { /* Maximum threads reached */ if (!queuable) { silc_mutex_unlock(tp->lock); + silc_set_errno(SILC_ERR_LIMIT); return FALSE; } - SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context)); - - /* User wants to queue this call until thread becomes free */ - t = silc_calloc(1, sizeof(*t)); + /* User wants to queue this call until thread becomes free. Get + a thread to assign this call. */ + t = silc_list_get(tp->threads); if (!t) { - silc_mutex_unlock(tp->lock); - return FALSE; + /* List wraps around */ + silc_list_start(tp->threads); + t = silc_list_get(tp->threads); } + silc_mutex_unlock(tp->lock); - t->run = run; - t->run_context = run_context; - t->completion = completion; - t->completion_context = completion_context; - t->schedule = schedule; + SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p", + run, run_context, t)); - silc_list_add(tp->queue, t); - silc_mutex_unlock(tp->lock); + silc_mutex_lock(t->lock); + + /* Get free call context from the list */ + silc_list_start(t->free_queue); + q = silc_list_get(t->free_queue); + if (!q) { + q = silc_scalloc(tp->stack, 1, sizeof(*q)); + if (!q) { + silc_mutex_unlock(t->lock); + return FALSE; + } + } else { + silc_list_del(t->free_queue, q); + } + + q->run = run; + q->run_context = run_context; + q->completion = completion; + q->completion_context = completion_context; + q->schedule = schedule; + + /* Add at the start of the list. It gets executed first. */ + silc_list_insert(t->queue, NULL, q); + silc_mutex_unlock(t->lock); return TRUE; } else { /* Create new thread */ @@ -346,20 +481,24 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp, } } + silc_list_del(tp->free_threads, t); + silc_mutex_unlock(tp->lock); + SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t)); + silc_mutex_lock(t->lock); + /* Mark this call to be executed in this thread */ t->run = run; t->run_context = run_context; t->completion = completion; t->completion_context = completion_context; t->schedule = schedule; - silc_list_del(tp->free_threads, t); - /* Signal threads */ - silc_cond_signal(tp->pool_signal); + /* Signal the thread */ + silc_cond_signal(t->thread_signal); + silc_mutex_unlock(t->lock); - silc_mutex_unlock(tp->lock); return TRUE; } @@ -377,7 +516,7 @@ void silc_thread_pool_set_max_threads(SilcThreadPool tp, /* Get maximum threads in the pool */ -SilcUInt32 silc_thread_pool_num_max_threads(SilcThreadPool tp) +SilcUInt32 silc_thread_pool_get_max_threads(SilcThreadPool tp) { SilcUInt32 max_threads; @@ -411,6 +550,7 @@ void silc_thread_pool_purge(SilcThreadPool tp) silc_mutex_lock(tp->lock); if (silc_list_count(tp->free_threads) <= tp->min_threads) { + SILC_LOG_DEBUG(("No threads to purge")); silc_mutex_unlock(tp->lock); return; } @@ -421,10 +561,17 @@ void silc_thread_pool_purge(SilcThreadPool tp) silc_list_start(tp->threads); while ((t = silc_list_get(tp->threads))) { - if (t->run) + silc_mutex_lock(t->lock); + if (t->run) { + silc_mutex_unlock(t->lock); continue; + } + /* Signal the thread to stop */ t->stop = TRUE; + silc_cond_signal(t->thread_signal); + silc_mutex_unlock(t->lock); + silc_list_del(tp->free_threads, t); i--; @@ -432,8 +579,30 @@ void silc_thread_pool_purge(SilcThreadPool tp) break; } - /* Signal threads to stop */ - silc_cond_signal(tp->pool_signal); - + silc_list_start(tp->threads); silc_mutex_unlock(tp->lock); } + +/*************************** Thread-local Storage ***************************/ + +void silc_thread_tls_set(void *context) +{ + SilcTls tls = silc_thread_get_tls(); + + if (!tls) { + /* Initialize Tls for this thread */ + tls = silc_thread_tls_init(); + if (!tls) + return; + } + + tls->thread_context = context; +} + +void *silc_thread_tls_get(void) +{ + SilcTls tls = silc_thread_get_tls(); + if (!tls) + return NULL; + return tls->thread_context; +}