struct SilcThreadPoolThreadStruct *next;
struct SilcThreadPoolThreadStruct *next2;
SilcThreadPool tp; /* The thread pool */
+ SilcCond thread_signal; /* Condition variable for signalling */
SilcMutex lock; /* Thread lock */
SilcList queue; /* Queue for waiting calls */
SilcList free_queue; /* Queue freelist */
/* Thread pool context */
struct SilcThreadPoolStruct {
SilcStack stack; /* Stack for memory allocation */
- SilcMutex lock; /* Pool lock */
SilcCond pool_signal; /* Condition variable for signalling */
+ SilcMutex lock; /* Pool lock */
SilcList threads; /* Threads in the pool */
SilcList free_threads; /* Threads freelist */
SilcUInt16 min_threads; /* Minimum threads in the pool */
{
SilcThreadPoolThread t = context, o, q;
SilcThreadPool tp = t->tp;
- SilcMutex lock = tp->lock;
- SilcCond pool_signal = tp->pool_signal;
+ SilcMutex lock = t->lock;
+ SilcCond thread_signal = t->thread_signal;
silc_mutex_lock(lock);
while (1) {
/* Wait here for code to execute */
while (!t->run && !t->stop)
- silc_cond_wait(pool_signal, lock);
+ silc_cond_wait(thread_signal, lock);
if (silc_unlikely(t->stop)) {
- /* Stop the thread. Remove from threads list and free memory. */
+ /* Stop the thread. Remove from threads list. */
SILC_LOG_DEBUG(("Stop thread %p", t));
+ silc_mutex_lock(tp->lock);
silc_list_del(tp->threads, t);
silc_list_start(tp->threads);
while ((q = silc_list_get(t->free_queue)))
silc_sfree(tp->stack, q);
- silc_mutex_free(t->lock);
+ /* Destroy the thread */
+ silc_mutex_unlock(lock);
+ silc_mutex_free(lock);
+ silc_cond_free(thread_signal);
silc_sfree(tp->stack, t);
/* If we are last thread, signal the waiting destructor. */
if (silc_list_count(tp->threads) == 0)
- silc_cond_signal(pool_signal);
+ silc_cond_signal(tp->pool_signal);
/* Release pool reference. Releases lock also. */
silc_thread_pool_unref(tp);
/* Check if there are calls in queue. Takes the most recently added
call since new ones are added at the start of the list. */
- silc_mutex_lock(t->lock);
+ silc_mutex_lock(lock);
if (silc_list_count(t->queue) > 0) {
+ execute_queue:
silc_list_start(t->queue);
q = silc_list_get(t->queue);
silc_list_del(t->queue, q);
silc_list_add(t->free_queue, q);
- silc_mutex_unlock(t->lock);
+ silc_mutex_unlock(lock);
goto execute;
} else {
- silc_mutex_unlock(t->lock);
+ silc_mutex_unlock(lock);
/* Nothing to do. Attempt to steal call from some other thread. */
- silc_mutex_lock(lock);
+ silc_mutex_lock(tp->lock);
o = silc_list_get(tp->threads);
if (!o) {
/* List wraps around */
silc_list_start(tp->threads);
o = silc_list_get(tp->threads);
}
- silc_mutex_unlock(lock);
+ silc_mutex_unlock(tp->lock);
if (o && o != t) {
silc_mutex_lock(o->lock);
}
}
+ silc_mutex_lock(lock);
+
+ /* Now that we have the lock back, check the queue again. */
+ if (silc_list_count(t->queue) > 0)
+ goto execute_queue;
+
/* The thread is now free for use again. */
t->run = NULL;
t->completion = NULL;
t->schedule = NULL;
-
- silc_mutex_lock(lock);
silc_list_add(tp->free_threads, t);
}
return NULL;
}
+ if (!silc_cond_alloc(&t->thread_signal)) {
+ silc_mutex_free(t->lock);
+ silc_sfree(tp->stack, t);
+ return NULL;
+ }
+
t->tp = tp;
silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
/* Stop threads */
silc_list_start(tp->threads);
- while ((t = silc_list_get(tp->threads)))
+ while ((t = silc_list_get(tp->threads))) {
+ silc_mutex_lock(t->lock);
t->stop = TRUE;
- silc_cond_broadcast(tp->pool_signal);
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
+ }
if (wait_unfinished) {
SILC_LOG_DEBUG(("Wait threads to finish"));
silc_list_start(tp->threads);
t = silc_list_get(tp->threads);
}
+ silc_mutex_unlock(tp->lock);
SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
run, run_context, t));
- /* Lock the thread. Keep also thread pool lock so that this thread
- cannot become free while we're adding call to its queue. */
silc_mutex_lock(t->lock);
/* Get free call context from the list */
q = silc_scalloc(tp->stack, 1, sizeof(*q));
if (!q) {
silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return FALSE;
}
} else {
/* Add at the start of the list. It gets executed first. */
silc_list_insert(t->queue, NULL, q);
silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return TRUE;
} else {
/* Create new thread */
}
}
+ silc_list_del(tp->free_threads, t);
+ silc_mutex_unlock(tp->lock);
+
SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
+ silc_mutex_lock(t->lock);
+
/* Mark this call to be executed in this thread */
t->run = run;
t->run_context = run_context;
t->completion = completion;
t->completion_context = completion_context;
t->schedule = schedule;
- silc_list_del(tp->free_threads, t);
- /* Signal threads */
- silc_cond_broadcast(tp->pool_signal);
+ /* Signal the thread */
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return TRUE;
}
continue;
}
+ /* Signal the thread to stop */
t->stop = TRUE;
+ silc_cond_signal(t->thread_signal);
silc_mutex_unlock(t->lock);
silc_list_del(tp->free_threads, t);
break;
}
- /* Signal threads to stop */
- silc_cond_broadcast(tp->pool_signal);
-
silc_list_start(tp->threads);
silc_mutex_unlock(tp->lock);
}