Changed condition variables thread specific.
authorPekka Riikonen <priikone@silcnet.org>
Thu, 26 Jul 2007 18:18:41 +0000 (18:18 +0000)
committerPekka Riikonen <priikone@silcnet.org>
Thu, 26 Jul 2007 18:18:41 +0000 (18:18 +0000)
lib/silcutil/silcthread.c

index 3a89206b0ededf9c4a01005f62c232bf4516cf6f..ad53a421692192bbcae4a6f5a529f1c5026990e9 100644 (file)
@@ -48,6 +48,7 @@ typedef struct SilcThreadPoolThreadStruct {
   struct SilcThreadPoolThreadStruct *next;
   struct SilcThreadPoolThreadStruct *next2;
   SilcThreadPool tp;               /* The thread pool */
+  SilcCond thread_signal;           /* Condition variable for signalling */
   SilcMutex lock;                  /* Thread lock */
   SilcList queue;                  /* Queue for waiting calls */
   SilcList free_queue;             /* Queue freelist */
@@ -62,8 +63,8 @@ typedef struct SilcThreadPoolThreadStruct {
 /* Thread pool context */
 struct SilcThreadPoolStruct {
   SilcStack stack;                 /* Stack for memory allocation */
-  SilcMutex lock;                  /* Pool lock */
   SilcCond pool_signal;                    /* Condition variable for signalling */
+  SilcMutex lock;                  /* Pool lock */
   SilcList threads;                /* Threads in the pool */
   SilcList free_threads;           /* Threads freelist */
   SilcUInt16 min_threads;          /* Minimum threads in the pool */
@@ -109,19 +110,20 @@ static void *silc_thread_pool_run_thread(void *context)
 {
   SilcThreadPoolThread t = context, o, q;
   SilcThreadPool tp = t->tp;
-  SilcMutex lock = tp->lock;
-  SilcCond pool_signal = tp->pool_signal;
+  SilcMutex lock = t->lock;
+  SilcCond thread_signal = t->thread_signal;
 
   silc_mutex_lock(lock);
 
   while (1) {
     /* Wait here for code to execute */
     while (!t->run && !t->stop)
-      silc_cond_wait(pool_signal, lock);
+      silc_cond_wait(thread_signal, lock);
 
     if (silc_unlikely(t->stop)) {
-      /* Stop the thread.  Remove from threads list and free memory. */
+      /* Stop the thread.  Remove from threads list. */
       SILC_LOG_DEBUG(("Stop thread %p", t));
+      silc_mutex_lock(tp->lock);
       silc_list_del(tp->threads, t);
       silc_list_start(tp->threads);
 
@@ -133,12 +135,15 @@ static void *silc_thread_pool_run_thread(void *context)
       while ((q = silc_list_get(t->free_queue)))
        silc_sfree(tp->stack, q);
 
-      silc_mutex_free(t->lock);
+      /* Destroy the thread */
+      silc_mutex_unlock(lock);
+      silc_mutex_free(lock);
+      silc_cond_free(thread_signal);
       silc_sfree(tp->stack, t);
 
       /* If we are last thread, signal the waiting destructor. */
       if (silc_list_count(tp->threads) == 0)
-       silc_cond_signal(pool_signal);
+        silc_cond_signal(tp->pool_signal);
 
       /* Release pool reference.  Releases lock also. */
       silc_thread_pool_unref(tp);
@@ -172,8 +177,9 @@ static void *silc_thread_pool_run_thread(void *context)
 
     /* Check if there are calls in queue.  Takes the most recently added
        call since new ones are added at the start of the list. */
-    silc_mutex_lock(t->lock);
+    silc_mutex_lock(lock);
     if (silc_list_count(t->queue) > 0) {
+    execute_queue:
       silc_list_start(t->queue);
       q = silc_list_get(t->queue);
 
@@ -188,20 +194,20 @@ static void *silc_thread_pool_run_thread(void *context)
 
       silc_list_del(t->queue, q);
       silc_list_add(t->free_queue, q);
-      silc_mutex_unlock(t->lock);
+      silc_mutex_unlock(lock);
       goto execute;
     } else {
-      silc_mutex_unlock(t->lock);
+      silc_mutex_unlock(lock);
 
       /* Nothing to do.  Attempt to steal call from some other thread. */
-      silc_mutex_lock(lock);
+      silc_mutex_lock(tp->lock);
       o = silc_list_get(tp->threads);
       if (!o) {
        /* List wraps around */
        silc_list_start(tp->threads);
        o = silc_list_get(tp->threads);
       }
-      silc_mutex_unlock(lock);
+      silc_mutex_unlock(tp->lock);
 
       if (o && o != t) {
        silc_mutex_lock(o->lock);
@@ -227,12 +233,16 @@ static void *silc_thread_pool_run_thread(void *context)
       }
     }
 
+    silc_mutex_lock(lock);
+
+    /* Now that we have the lock back, check the queue again. */
+    if (silc_list_count(t->queue) > 0)
+      goto execute_queue;
+
     /* The thread is now free for use again. */
     t->run = NULL;
     t->completion = NULL;
     t->schedule = NULL;
-
-    silc_mutex_lock(lock);
     silc_list_add(tp->free_threads, t);
   }
 
@@ -254,6 +264,12 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
     return NULL;
   }
 
+  if (!silc_cond_alloc(&t->thread_signal)) {
+    silc_mutex_free(t->lock);
+    silc_sfree(tp->stack, t);
+    return NULL;
+  }
+
   t->tp = tp;
   silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
   silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
@@ -342,9 +358,12 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
 
   /* Stop threads */
   silc_list_start(tp->threads);
-  while ((t = silc_list_get(tp->threads)))
+  while ((t = silc_list_get(tp->threads))) {
+    silc_mutex_lock(t->lock);
     t->stop = TRUE;
-  silc_cond_broadcast(tp->pool_signal);
+    silc_cond_signal(t->thread_signal);
+    silc_mutex_unlock(t->lock);
+  }
 
   if (wait_unfinished) {
     SILC_LOG_DEBUG(("Wait threads to finish"));
@@ -394,12 +413,11 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
        silc_list_start(tp->threads);
        t = silc_list_get(tp->threads);
       }
+      silc_mutex_unlock(tp->lock);
 
       SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
                      run, run_context, t));
 
-      /* Lock the thread.  Keep also thread pool lock so that this thread
-        cannot become free while we're adding call to its queue. */
       silc_mutex_lock(t->lock);
 
       /* Get free call context from the list */
@@ -409,7 +427,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
        q = silc_scalloc(tp->stack, 1, sizeof(*q));
        if (!q) {
          silc_mutex_unlock(t->lock);
-         silc_mutex_unlock(tp->lock);
          return FALSE;
        }
       } else {
@@ -425,7 +442,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
       /* Add at the start of the list.  It gets executed first. */
       silc_list_insert(t->queue, NULL, q);
       silc_mutex_unlock(t->lock);
-      silc_mutex_unlock(tp->lock);
       return TRUE;
     } else {
       /* Create new thread */
@@ -437,20 +453,24 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
     }
   }
 
+  silc_list_del(tp->free_threads, t);
+  silc_mutex_unlock(tp->lock);
+
   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
 
+  silc_mutex_lock(t->lock);
+
   /* Mark this call to be executed in this thread */
   t->run = run;
   t->run_context = run_context;
   t->completion = completion;
   t->completion_context = completion_context;
   t->schedule = schedule;
-  silc_list_del(tp->free_threads, t);
 
-  /* Signal threads */
-  silc_cond_broadcast(tp->pool_signal);
+  /* Signal the thread */
+  silc_cond_signal(t->thread_signal);
+  silc_mutex_unlock(t->lock);
 
-  silc_mutex_unlock(tp->lock);
   return TRUE;
 }
 
@@ -519,7 +539,9 @@ void silc_thread_pool_purge(SilcThreadPool tp)
       continue;
     }
 
+    /* Signal the thread to stop */
     t->stop = TRUE;
+    silc_cond_signal(t->thread_signal);
     silc_mutex_unlock(t->lock);
 
     silc_list_del(tp->free_threads, t);
@@ -529,9 +551,6 @@ void silc_thread_pool_purge(SilcThreadPool tp)
       break;
   }
 
-  /* Signal threads to stop */
-  silc_cond_broadcast(tp->pool_signal);
-
   silc_list_start(tp->threads);
   silc_mutex_unlock(tp->lock);
 }