projects
/
crypto.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
0876c9e
)
Changed condition variables thread specific.
author
Pekka Riikonen
<priikone@silcnet.org>
Thu, 26 Jul 2007 18:18:41 +0000
(18:18 +0000)
committer
Pekka Riikonen
<priikone@silcnet.org>
Thu, 26 Jul 2007 18:18:41 +0000
(18:18 +0000)
lib/silcutil/silcthread.c
patch
|
blob
|
history
diff --git
a/lib/silcutil/silcthread.c
b/lib/silcutil/silcthread.c
index 3a89206b0ededf9c4a01005f62c232bf4516cf6f..ad53a421692192bbcae4a6f5a529f1c5026990e9 100644
(file)
--- a/
lib/silcutil/silcthread.c
+++ b/
lib/silcutil/silcthread.c
@@
-48,6
+48,7
@@
typedef struct SilcThreadPoolThreadStruct {
struct SilcThreadPoolThreadStruct *next;
struct SilcThreadPoolThreadStruct *next2;
SilcThreadPool tp; /* The thread pool */
struct SilcThreadPoolThreadStruct *next;
struct SilcThreadPoolThreadStruct *next2;
SilcThreadPool tp; /* The thread pool */
+ SilcCond thread_signal; /* Condition variable for signalling */
SilcMutex lock; /* Thread lock */
SilcList queue; /* Queue for waiting calls */
SilcList free_queue; /* Queue freelist */
SilcMutex lock; /* Thread lock */
SilcList queue; /* Queue for waiting calls */
SilcList free_queue; /* Queue freelist */
@@
-62,8
+63,8
@@
typedef struct SilcThreadPoolThreadStruct {
/* Thread pool context */
struct SilcThreadPoolStruct {
SilcStack stack; /* Stack for memory allocation */
/* Thread pool context */
struct SilcThreadPoolStruct {
SilcStack stack; /* Stack for memory allocation */
- SilcMutex lock; /* Pool lock */
SilcCond pool_signal; /* Condition variable for signalling */
SilcCond pool_signal; /* Condition variable for signalling */
+ SilcMutex lock; /* Pool lock */
SilcList threads; /* Threads in the pool */
SilcList free_threads; /* Threads freelist */
SilcUInt16 min_threads; /* Minimum threads in the pool */
SilcList threads; /* Threads in the pool */
SilcList free_threads; /* Threads freelist */
SilcUInt16 min_threads; /* Minimum threads in the pool */
@@
-109,19
+110,20
@@
static void *silc_thread_pool_run_thread(void *context)
{
SilcThreadPoolThread t = context, o, q;
SilcThreadPool tp = t->tp;
{
SilcThreadPoolThread t = context, o, q;
SilcThreadPool tp = t->tp;
- SilcMutex lock = t
p
->lock;
- SilcCond
pool_signal = tp->pool
_signal;
+ SilcMutex lock = t->lock;
+ SilcCond
thread_signal = t->thread
_signal;
silc_mutex_lock(lock);
while (1) {
/* Wait here for code to execute */
while (!t->run && !t->stop)
silc_mutex_lock(lock);
while (1) {
/* Wait here for code to execute */
while (!t->run && !t->stop)
- silc_cond_wait(
pool
_signal, lock);
+ silc_cond_wait(
thread
_signal, lock);
if (silc_unlikely(t->stop)) {
if (silc_unlikely(t->stop)) {
- /* Stop the thread. Remove from threads list
and free memory
. */
+ /* Stop the thread. Remove from threads list. */
SILC_LOG_DEBUG(("Stop thread %p", t));
SILC_LOG_DEBUG(("Stop thread %p", t));
+ silc_mutex_lock(tp->lock);
silc_list_del(tp->threads, t);
silc_list_start(tp->threads);
silc_list_del(tp->threads, t);
silc_list_start(tp->threads);
@@
-133,12
+135,15
@@
static void *silc_thread_pool_run_thread(void *context)
while ((q = silc_list_get(t->free_queue)))
silc_sfree(tp->stack, q);
while ((q = silc_list_get(t->free_queue)))
silc_sfree(tp->stack, q);
- silc_mutex_free(t->lock);
+ /* Destroy the thread */
+ silc_mutex_unlock(lock);
+ silc_mutex_free(lock);
+ silc_cond_free(thread_signal);
silc_sfree(tp->stack, t);
/* If we are last thread, signal the waiting destructor. */
if (silc_list_count(tp->threads) == 0)
silc_sfree(tp->stack, t);
/* If we are last thread, signal the waiting destructor. */
if (silc_list_count(tp->threads) == 0)
-
silc_cond_signal(
pool_signal);
+
silc_cond_signal(tp->
pool_signal);
/* Release pool reference. Releases lock also. */
silc_thread_pool_unref(tp);
/* Release pool reference. Releases lock also. */
silc_thread_pool_unref(tp);
@@
-172,8
+177,9
@@
static void *silc_thread_pool_run_thread(void *context)
/* Check if there are calls in queue. Takes the most recently added
call since new ones are added at the start of the list. */
/* Check if there are calls in queue. Takes the most recently added
call since new ones are added at the start of the list. */
- silc_mutex_lock(
t->
lock);
+ silc_mutex_lock(lock);
if (silc_list_count(t->queue) > 0) {
if (silc_list_count(t->queue) > 0) {
+ execute_queue:
silc_list_start(t->queue);
q = silc_list_get(t->queue);
silc_list_start(t->queue);
q = silc_list_get(t->queue);
@@
-188,20
+194,20
@@
static void *silc_thread_pool_run_thread(void *context)
silc_list_del(t->queue, q);
silc_list_add(t->free_queue, q);
silc_list_del(t->queue, q);
silc_list_add(t->free_queue, q);
- silc_mutex_unlock(
t->
lock);
+ silc_mutex_unlock(lock);
goto execute;
} else {
goto execute;
} else {
- silc_mutex_unlock(
t->
lock);
+ silc_mutex_unlock(lock);
/* Nothing to do. Attempt to steal call from some other thread. */
/* Nothing to do. Attempt to steal call from some other thread. */
- silc_mutex_lock(lock);
+ silc_mutex_lock(
tp->
lock);
o = silc_list_get(tp->threads);
if (!o) {
/* List wraps around */
silc_list_start(tp->threads);
o = silc_list_get(tp->threads);
}
o = silc_list_get(tp->threads);
if (!o) {
/* List wraps around */
silc_list_start(tp->threads);
o = silc_list_get(tp->threads);
}
- silc_mutex_unlock(lock);
+ silc_mutex_unlock(
tp->
lock);
if (o && o != t) {
silc_mutex_lock(o->lock);
if (o && o != t) {
silc_mutex_lock(o->lock);
@@
-227,12
+233,16
@@
static void *silc_thread_pool_run_thread(void *context)
}
}
}
}
+ silc_mutex_lock(lock);
+
+ /* Now that we have the lock back, check the queue again. */
+ if (silc_list_count(t->queue) > 0)
+ goto execute_queue;
+
/* The thread is now free for use again. */
t->run = NULL;
t->completion = NULL;
t->schedule = NULL;
/* The thread is now free for use again. */
t->run = NULL;
t->completion = NULL;
t->schedule = NULL;
-
- silc_mutex_lock(lock);
silc_list_add(tp->free_threads, t);
}
silc_list_add(tp->free_threads, t);
}
@@
-254,6
+264,12
@@
static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
return NULL;
}
return NULL;
}
+ if (!silc_cond_alloc(&t->thread_signal)) {
+ silc_mutex_free(t->lock);
+ silc_sfree(tp->stack, t);
+ return NULL;
+ }
+
t->tp = tp;
silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
t->tp = tp;
silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
@@
-342,9
+358,12
@@
void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
/* Stop threads */
silc_list_start(tp->threads);
/* Stop threads */
silc_list_start(tp->threads);
- while ((t = silc_list_get(tp->threads)))
+ while ((t = silc_list_get(tp->threads))) {
+ silc_mutex_lock(t->lock);
t->stop = TRUE;
t->stop = TRUE;
- silc_cond_broadcast(tp->pool_signal);
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
+ }
if (wait_unfinished) {
SILC_LOG_DEBUG(("Wait threads to finish"));
if (wait_unfinished) {
SILC_LOG_DEBUG(("Wait threads to finish"));
@@
-394,12
+413,11
@@
SilcBool silc_thread_pool_run(SilcThreadPool tp,
silc_list_start(tp->threads);
t = silc_list_get(tp->threads);
}
silc_list_start(tp->threads);
t = silc_list_get(tp->threads);
}
+ silc_mutex_unlock(tp->lock);
SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
run, run_context, t));
SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
run, run_context, t));
- /* Lock the thread. Keep also thread pool lock so that this thread
- cannot become free while we're adding call to its queue. */
silc_mutex_lock(t->lock);
/* Get free call context from the list */
silc_mutex_lock(t->lock);
/* Get free call context from the list */
@@
-409,7
+427,6
@@
SilcBool silc_thread_pool_run(SilcThreadPool tp,
q = silc_scalloc(tp->stack, 1, sizeof(*q));
if (!q) {
silc_mutex_unlock(t->lock);
q = silc_scalloc(tp->stack, 1, sizeof(*q));
if (!q) {
silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return FALSE;
}
} else {
return FALSE;
}
} else {
@@
-425,7
+442,6
@@
SilcBool silc_thread_pool_run(SilcThreadPool tp,
/* Add at the start of the list. It gets executed first. */
silc_list_insert(t->queue, NULL, q);
silc_mutex_unlock(t->lock);
/* Add at the start of the list. It gets executed first. */
silc_list_insert(t->queue, NULL, q);
silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return TRUE;
} else {
/* Create new thread */
return TRUE;
} else {
/* Create new thread */
@@
-437,20
+453,24
@@
SilcBool silc_thread_pool_run(SilcThreadPool tp,
}
}
}
}
+ silc_list_del(tp->free_threads, t);
+ silc_mutex_unlock(tp->lock);
+
SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
+ silc_mutex_lock(t->lock);
+
/* Mark this call to be executed in this thread */
t->run = run;
t->run_context = run_context;
t->completion = completion;
t->completion_context = completion_context;
t->schedule = schedule;
/* Mark this call to be executed in this thread */
t->run = run;
t->run_context = run_context;
t->completion = completion;
t->completion_context = completion_context;
t->schedule = schedule;
- silc_list_del(tp->free_threads, t);
- /* Signal threads */
- silc_cond_broadcast(tp->pool_signal);
+ /* Signal the thread */
+ silc_cond_signal(t->thread_signal);
+ silc_mutex_unlock(t->lock);
- silc_mutex_unlock(tp->lock);
return TRUE;
}
return TRUE;
}
@@
-519,7
+539,9
@@
void silc_thread_pool_purge(SilcThreadPool tp)
continue;
}
continue;
}
+ /* Signal the thread to stop */
t->stop = TRUE;
t->stop = TRUE;
+ silc_cond_signal(t->thread_signal);
silc_mutex_unlock(t->lock);
silc_list_del(tp->free_threads, t);
silc_mutex_unlock(t->lock);
silc_list_del(tp->free_threads, t);
@@
-529,9
+551,6
@@
void silc_thread_pool_purge(SilcThreadPool tp)
break;
}
break;
}
- /* Signal threads to stop */
- silc_cond_broadcast(tp->pool_signal);
-
silc_list_start(tp->threads);
silc_mutex_unlock(tp->lock);
}
silc_list_start(tp->threads);
silc_mutex_unlock(tp->lock);
}