#include "silc.h"
+/* Explanation of the thread pool execution.
+
+ When new call is added to thread pool by calling silc_thread_pool_run
+ it is assigned to a first free thread from the free list. If no threads
+ are available we take one from the threads list and assign the call to
+ its queue. The threads list always takes different thread finally wrapping
+ from the beginning. This way each thread will get a chance to execute
+ queued calls.
+
+ The thread function silc_thread_pool_run_thread executes each call. After
+ executing the current call that has been assigned to it, it will check
+ if there are any queued calls in its queue, and it will execute all calls
+ from the queue. If there aren't any calls in the queue, it will attempt
+ to steal a call from some other thread and execute it.
+
+ The queue list is always consumed in last-in-first-out order. The most
+ recently added call gets priority. With full utilization this helps to
+ avoid CPU cache misses. Since the queues are thread specific with full
+ utilization each thread should always be doing work for the most recent
+ (and thus most important) calls. */
+
/************************** Types and definitions ***************************/
-/* Thread pool thread context */
+/* Thread pool thread context. Each thread contains the most current call
+ to be executed, and a list of queued calls. */
typedef struct SilcThreadPoolThreadStruct {
struct SilcThreadPoolThreadStruct *next;
struct SilcThreadPoolThreadStruct *next2;
SilcThreadPool tp; /* The thread pool */
- SilcSchedule schedule; /* Scheduler, may be NULL */
- SilcThreadPoolFunc run; /* The function to run in a thread */
- SilcTaskCallback completion; /* Completion function */
+ SilcMutex lock; /* Thread lock */
+ SilcList queue; /* Queue for waiting calls */
+ SilcList free_queue; /* Queue freelist */
+ SilcSchedule schedule; /* The current Scheduler, may be NULL */
+ SilcThreadPoolFunc run; /* The current call to run in a thread */
+ SilcTaskCallback completion; /* The current Completion function */
void *run_context;
void *completion_context;
unsigned int stop : 1; /* Set to stop the thread */
SilcCond pool_signal; /* Condition variable for signalling */
SilcList threads; /* Threads in the pool */
SilcList free_threads; /* Threads freelist */
- SilcList queue; /* Queue for waiting calls */
- SilcList free_queue; /* Queue freelist */
SilcUInt16 min_threads; /* Minimum threads in the pool */
SilcUInt16 max_threads; /* Maximum threads in the pool */
SilcUInt16 refcnt; /* Reference counter */
static void *silc_thread_pool_run_thread(void *context)
{
- SilcThreadPoolThread t = context, q;
+ SilcThreadPoolThread t = context, o, q;
SilcThreadPool tp = t->tp;
SilcMutex lock = tp->lock;
SilcCond pool_signal = tp->pool_signal;
while (!t->run && !t->stop)
silc_cond_wait(pool_signal, lock);
- if (t->stop) {
+ if (silc_unlikely(t->stop)) {
/* Stop the thread. Remove from threads list and free memory. */
SILC_LOG_DEBUG(("Stop thread %p", t));
silc_list_del(tp->threads, t);
+ silc_list_start(tp->threads);
+
+ /* Clear thread's call queue. */
+ silc_list_start(t->queue);
+ silc_list_start(t->free_queue);
+ while ((q = silc_list_get(t->queue)))
+ silc_sfree(tp->stack, q);
+ while ((q = silc_list_get(t->free_queue)))
+ silc_sfree(tp->stack, q);
+
+ silc_mutex_free(t->lock);
silc_sfree(tp->stack, t);
/* If we are last thread, signal the waiting destructor. */
if (silc_list_count(tp->threads) == 0)
- silc_cond_broadcast(pool_signal);
+ silc_cond_signal(pool_signal);
/* Release pool reference. Releases lock also. */
silc_thread_pool_unref(tp);
silc_mutex_unlock(lock);
/* Execute code */
+ execute:
SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
t->run_context, t));
t->run(t->schedule, t->run_context);
}
}
- silc_mutex_lock(lock);
-
- /* Check if there are calls in queue */
- if (silc_list_count(tp->queue) > 0) {
- silc_list_start(tp->queue);
- q = silc_list_get(tp->queue);
+ /* Check if there are calls in queue. Takes the most recently added
+ call since new ones are added at the start of the list. */
+ silc_mutex_lock(t->lock);
+ if (silc_list_count(t->queue) > 0) {
+ silc_list_start(t->queue);
+ q = silc_list_get(t->queue);
SILC_LOG_DEBUG(("Execute call from queue"));
t->completion_context = q->completion_context;
t->schedule = q->schedule;
- silc_list_del(tp->queue, q);
- silc_list_add(tp->free_queue, q);
- continue;
+ silc_list_del(t->queue, q);
+ silc_list_add(t->free_queue, q);
+ silc_mutex_unlock(t->lock);
+ goto execute;
+ } else {
+ silc_mutex_unlock(t->lock);
+
+ /* Nothing to do. Attempt to steal call from some other thread. */
+ silc_mutex_lock(lock);
+ o = silc_list_get(tp->threads);
+ if (!o) {
+ /* List wraps around */
+ silc_list_start(tp->threads);
+ o = silc_list_get(tp->threads);
+ }
+ silc_mutex_unlock(lock);
+
+ if (o && o != t) {
+ silc_mutex_lock(o->lock);
+ if (silc_list_count(o->queue) > 0) {
+ silc_list_start(o->queue);
+ q = silc_list_get(o->queue);
+
+ SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
+
+ /* Execute this call now */
+ t->run = q->run;
+ t->run_context = q->run_context;
+ t->completion = q->completion;
+ t->completion_context = q->completion_context;
+ t->schedule = q->schedule;
+
+ silc_list_del(o->queue, q);
+ silc_list_add(o->free_queue, q);
+ silc_mutex_unlock(o->lock);
+ goto execute;
+ }
+ silc_mutex_unlock(o->lock);
+ }
}
/* The thread is now free for use again. */
t->run = NULL;
t->completion = NULL;
t->schedule = NULL;
+
+ silc_mutex_lock(lock);
silc_list_add(tp->free_threads, t);
}
t = silc_scalloc(tp->stack, 1, sizeof(*t));
if (!t)
return NULL;
+
+ if (!silc_mutex_alloc(&t->lock)) {
+ silc_sfree(tp->stack, t);
+ return NULL;
+ }
+
t->tp = tp;
+ silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
+ silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
+
+ /* Add to thread pool */
silc_list_add(tp->threads, t);
silc_list_add(tp->free_threads, t);
silc_thread_pool_ref(tp);
if (max_threads < min_threads)
return NULL;
+ if (!max_threads)
+ return NULL;
if (stack)
stack = silc_stack_alloc(0, stack);
silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
- silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
- silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
for (i = 0; i < tp->min_threads && start_min_threads; i++)
silc_thread_pool_new_thread(tp);
+ silc_list_start(tp->threads);
+
return tp;
}
silc_cond_wait(tp->pool_signal, tp->lock);
}
- /* Free calls from queue */
- silc_list_start(tp->queue);
- while ((t = silc_list_get(tp->queue)))
- silc_sfree(tp->stack, t);
- silc_list_start(tp->free_queue);
- while ((t = silc_list_get(tp->free_queue)))
- silc_sfree(tp->stack, t);
- silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
- silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
-
/* Release reference. Releases lock also. */
silc_thread_pool_unref(tp);
}
SilcTaskCallback completion,
void *completion_context)
{
- SilcThreadPoolThread t;
+ SilcThreadPoolThread t, q;
silc_mutex_lock(tp->lock);
return FALSE;
}
- SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
-
- /* User wants to queue this call until thread becomes free */
- silc_list_start(tp->free_queue);
- t = silc_list_get(tp->free_queue);
+ /* User wants to queue this call until thread becomes free. Get
+ a thread to assign this call. */
+ t = silc_list_get(tp->threads);
if (!t) {
- t = silc_scalloc(tp->stack, 1, sizeof(*t));
- if (!t) {
+ /* List wraps around */
+ silc_list_start(tp->threads);
+ t = silc_list_get(tp->threads);
+ }
+
+ SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
+ run, run_context, t));
+
+ /* Lock the thread. Keep also thread pool lock so that this thread
+ cannot become free while we're adding call to its queue. */
+ silc_mutex_lock(t->lock);
+
+ /* Get free call context from the list */
+ silc_list_start(t->free_queue);
+ q = silc_list_get(t->free_queue);
+ if (!q) {
+ q = silc_scalloc(tp->stack, 1, sizeof(*q));
+ if (!q) {
+ silc_mutex_unlock(t->lock);
silc_mutex_unlock(tp->lock);
return FALSE;
}
} else {
- silc_list_del(tp->free_queue, t);
+ silc_list_del(t->free_queue, q);
}
- t->run = run;
- t->run_context = run_context;
- t->completion = completion;
- t->completion_context = completion_context;
- t->schedule = schedule;
+ q->run = run;
+ q->run_context = run_context;
+ q->completion = completion;
+ q->completion_context = completion_context;
+ q->schedule = schedule;
- silc_list_add(tp->queue, t);
+ /* Add at the start of the list. It gets executed first. */
+ silc_list_insert(t->queue, NULL, q);
+ silc_mutex_unlock(t->lock);
silc_mutex_unlock(tp->lock);
return TRUE;
} else {
silc_list_start(tp->threads);
while ((t = silc_list_get(tp->threads))) {
- if (t->run)
+ silc_mutex_lock(t->lock);
+ if (t->run) {
+ silc_mutex_unlock(t->lock);
continue;
+ }
t->stop = TRUE;
+ silc_mutex_unlock(t->lock);
+
silc_list_del(tp->free_threads, t);
i--;
/* Signal threads to stop */
silc_cond_broadcast(tp->pool_signal);
+ silc_list_start(tp->threads);
silc_mutex_unlock(tp->lock);
}