Added SILC Thread Queue API
[silc.git] / lib / silcutil / silcthread.c
index 3a89206b0ededf9c4a01005f62c232bf4516cf6f..8e51a15d13ef4c182382cf029f3b0f41512c3deb 100644 (file)
@@ -19,6 +19,8 @@
 
 #include "silc.h"
 
+/***************************** Thread Pool API *****************************/
+
 /* Explanation of the thread pool execution.
 
    When new call is added to thread pool by calling silc_thread_pool_run
@@ -48,6 +50,7 @@ typedef struct SilcThreadPoolThreadStruct {
   struct SilcThreadPoolThreadStruct *next;
   struct SilcThreadPoolThreadStruct *next2;
   SilcThreadPool tp;               /* The thread pool */
+  SilcCond thread_signal;           /* Condition variable for signalling */
   SilcMutex lock;                  /* Thread lock */
   SilcList queue;                  /* Queue for waiting calls */
   SilcList free_queue;             /* Queue freelist */
@@ -62,8 +65,8 @@ typedef struct SilcThreadPoolThreadStruct {
 /* Thread pool context */
 struct SilcThreadPoolStruct {
   SilcStack stack;                 /* Stack for memory allocation */
-  SilcMutex lock;                  /* Pool lock */
   SilcCond pool_signal;                    /* Condition variable for signalling */
+  SilcMutex lock;                  /* Pool lock */
   SilcList threads;                /* Threads in the pool */
   SilcList free_threads;           /* Threads freelist */
   SilcUInt16 min_threads;          /* Minimum threads in the pool */
@@ -109,44 +112,21 @@ static void *silc_thread_pool_run_thread(void *context)
 {
   SilcThreadPoolThread t = context, o, q;
   SilcThreadPool tp = t->tp;
-  SilcMutex lock = tp->lock;
-  SilcCond pool_signal = tp->pool_signal;
+  SilcMutex lock = t->lock;
+  SilcCond thread_signal = t->thread_signal;
 
   silc_mutex_lock(lock);
 
   while (1) {
     /* Wait here for code to execute */
     while (!t->run && !t->stop)
-      silc_cond_wait(pool_signal, lock);
-
-    if (silc_unlikely(t->stop)) {
-      /* Stop the thread.  Remove from threads list and free memory. */
-      SILC_LOG_DEBUG(("Stop thread %p", t));
-      silc_list_del(tp->threads, t);
-      silc_list_start(tp->threads);
-
-      /* Clear thread's call queue. */
-      silc_list_start(t->queue);
-      silc_list_start(t->free_queue);
-      while ((q = silc_list_get(t->queue)))
-       silc_sfree(tp->stack, q);
-      while ((q = silc_list_get(t->free_queue)))
-       silc_sfree(tp->stack, q);
-
-      silc_mutex_free(t->lock);
-      silc_sfree(tp->stack, t);
+      silc_cond_wait(thread_signal, lock);
 
-      /* If we are last thread, signal the waiting destructor. */
-      if (silc_list_count(tp->threads) == 0)
-       silc_cond_signal(pool_signal);
-
-      /* Release pool reference.  Releases lock also. */
-      silc_thread_pool_unref(tp);
-      break;
-    }
-    silc_mutex_unlock(lock);
+    if (t->stop)
+      goto stop;
 
     /* Execute code */
+    silc_mutex_unlock(lock);
   execute:
     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
                    t->run_context, t));
@@ -170,10 +150,14 @@ static void *silc_thread_pool_run_thread(void *context)
       }
     }
 
+    silc_mutex_lock(lock);
+    if (t->stop)
+      goto stop;
+
     /* Check if there are calls in queue.  Takes the most recently added
        call since new ones are added at the start of the list. */
-    silc_mutex_lock(t->lock);
     if (silc_list_count(t->queue) > 0) {
+    execute_queue:
       silc_list_start(t->queue);
       q = silc_list_get(t->queue);
 
@@ -188,54 +172,100 @@ static void *silc_thread_pool_run_thread(void *context)
 
       silc_list_del(t->queue, q);
       silc_list_add(t->free_queue, q);
-      silc_mutex_unlock(t->lock);
+      silc_mutex_unlock(lock);
       goto execute;
-    } else {
-      silc_mutex_unlock(t->lock);
+    }
+
+    silc_mutex_unlock(lock);
+    silc_mutex_lock(tp->lock);
 
-      /* Nothing to do.  Attempt to steal call from some other thread. */
-      silc_mutex_lock(lock);
+    /* Nothing to do.  Attempt to steal call from some other thread. */
+    o = silc_list_get(tp->threads);
+    if (!o) {
+      /* List wraps around */
+      silc_list_start(tp->threads);
       o = silc_list_get(tp->threads);
-      if (!o) {
-       /* List wraps around */
-       silc_list_start(tp->threads);
-       o = silc_list_get(tp->threads);
-      }
-      silc_mutex_unlock(lock);
+    }
 
-      if (o && o != t) {
-       silc_mutex_lock(o->lock);
-       if (silc_list_count(o->queue) > 0) {
-         silc_list_start(o->queue);
-         q = silc_list_get(o->queue);
-
-         SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
-
-         /* Execute this call now */
-         t->run = q->run;
-         t->run_context = q->run_context;
-         t->completion = q->completion;
-         t->completion_context = q->completion_context;
-         t->schedule = q->schedule;
-
-         silc_list_del(o->queue, q);
-         silc_list_add(o->free_queue, q);
-         silc_mutex_unlock(o->lock);
-         goto execute;
-       }
-       silc_mutex_unlock(o->lock);
-      }
+    /* Check that the other thread is valid and has something to execute. */
+    silc_mutex_lock(o->lock);
+    if (o == t || o->stop || silc_list_count(o->queue) == 0) {
+      silc_mutex_unlock(o->lock);
+      o = NULL;
+    }
+
+    if (o) {
+      silc_mutex_unlock(tp->lock);
+      silc_list_start(o->queue);
+      q = silc_list_get(o->queue);
+
+      SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
+
+      /* Execute this call now */
+      t->run = q->run;
+      t->run_context = q->run_context;
+      t->completion = q->completion;
+      t->completion_context = q->completion_context;
+      t->schedule = q->schedule;
+
+      silc_list_del(o->queue, q);
+      silc_list_add(o->free_queue, q);
+      silc_mutex_unlock(o->lock);
+      goto execute;
+    }
+
+    silc_mutex_lock(lock);
+    if (t->stop) {
+      silc_mutex_unlock(tp->lock);
+      goto stop;
+    }
+
+    /* Now that we have the lock back, check the queue again. */
+    if (silc_list_count(t->queue) > 0) {
+      silc_mutex_unlock(tp->lock);
+      goto execute_queue;
     }
 
     /* The thread is now free for use again. */
     t->run = NULL;
     t->completion = NULL;
     t->schedule = NULL;
-
-    silc_mutex_lock(lock);
     silc_list_add(tp->free_threads, t);
+    silc_mutex_unlock(tp->lock);
   }
 
+ stop:
+  /* Stop the thread.  Remove from threads list. */
+  SILC_LOG_DEBUG(("Stop thread %p", t));
+
+  /* We can unlock the thread now.  After we get the thread pool lock
+     no one can retrieve the thread anymore. */
+  silc_mutex_unlock(lock);
+  silc_mutex_lock(tp->lock);
+
+  silc_list_del(tp->threads, t);
+  silc_list_start(tp->threads);
+
+  /* Clear thread's call queue. */
+  silc_list_start(t->queue);
+  silc_list_start(t->free_queue);
+  while ((q = silc_list_get(t->queue)))
+    silc_sfree(tp->stack, q);
+  while ((q = silc_list_get(t->free_queue)))
+    silc_sfree(tp->stack, q);
+
+  /* Destroy the thread */
+  silc_mutex_free(lock);
+  silc_cond_free(thread_signal);
+  silc_sfree(tp->stack, t);
+
+  /* If we are last thread, signal the waiting destructor. */
+  if (silc_list_count(tp->threads) == 0)
+    silc_cond_signal(tp->pool_signal);
+
+  /* Release pool reference.  Releases lock also. */
+  silc_thread_pool_unref(tp);
+
   return NULL;
 }
 
@@ -254,6 +284,12 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
     return NULL;
   }
 
+  if (!silc_cond_alloc(&t->thread_signal)) {
+    silc_mutex_free(t->lock);
+    silc_sfree(tp->stack, t);
+    return NULL;
+  }
+
   t->tp = tp;
   silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
   silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
@@ -283,10 +319,16 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
   SilcThreadPool tp;
   int i;
 
-  if (max_threads < min_threads)
+  if (max_threads < min_threads) {
+    silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT,
+                         "Max threads is smaller than min threads (%d < %d)",
+                         max_threads, min_threads);
     return NULL;
-  if (!max_threads)
+  }
+  if (!max_threads) {
+    silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0");
     return NULL;
+  }
 
   if (stack)
     stack = silc_stack_alloc(0, stack);
@@ -342,9 +384,12 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
 
   /* Stop threads */
   silc_list_start(tp->threads);
-  while ((t = silc_list_get(tp->threads)))
+  while ((t = silc_list_get(tp->threads))) {
+    silc_mutex_lock(t->lock);
     t->stop = TRUE;
-  silc_cond_broadcast(tp->pool_signal);
+    silc_cond_signal(t->thread_signal);
+    silc_mutex_unlock(t->lock);
+  }
 
   if (wait_unfinished) {
     SILC_LOG_DEBUG(("Wait threads to finish"));
@@ -372,17 +417,19 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
 
   if (tp->destroy) {
     silc_mutex_unlock(tp->lock);
+    silc_set_errno(SILC_ERR_NOT_VALID);
     return FALSE;
   }
 
   /* Get free thread */
   silc_list_start(tp->free_threads);
   t = silc_list_get(tp->free_threads);
-  if (!t) {
+  if (!t || t->stop) {
     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
       /* Maximum threads reached */
       if (!queuable) {
        silc_mutex_unlock(tp->lock);
+       silc_set_errno(SILC_ERR_LIMIT);
        return FALSE;
       }
 
@@ -394,12 +441,11 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
        silc_list_start(tp->threads);
        t = silc_list_get(tp->threads);
       }
+      silc_mutex_unlock(tp->lock);
 
       SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
                      run, run_context, t));
 
-      /* Lock the thread.  Keep also thread pool lock so that this thread
-        cannot become free while we're adding call to its queue. */
       silc_mutex_lock(t->lock);
 
       /* Get free call context from the list */
@@ -409,7 +455,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
        q = silc_scalloc(tp->stack, 1, sizeof(*q));
        if (!q) {
          silc_mutex_unlock(t->lock);
-         silc_mutex_unlock(tp->lock);
          return FALSE;
        }
       } else {
@@ -425,7 +470,6 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
       /* Add at the start of the list.  It gets executed first. */
       silc_list_insert(t->queue, NULL, q);
       silc_mutex_unlock(t->lock);
-      silc_mutex_unlock(tp->lock);
       return TRUE;
     } else {
       /* Create new thread */
@@ -437,20 +481,24 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
     }
   }
 
+  silc_list_del(tp->free_threads, t);
+  silc_mutex_unlock(tp->lock);
+
   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
 
+  silc_mutex_lock(t->lock);
+
   /* Mark this call to be executed in this thread */
   t->run = run;
   t->run_context = run_context;
   t->completion = completion;
   t->completion_context = completion_context;
   t->schedule = schedule;
-  silc_list_del(tp->free_threads, t);
 
-  /* Signal threads */
-  silc_cond_broadcast(tp->pool_signal);
+  /* Signal the thread */
+  silc_cond_signal(t->thread_signal);
+  silc_mutex_unlock(t->lock);
 
-  silc_mutex_unlock(tp->lock);
   return TRUE;
 }
 
@@ -519,7 +567,9 @@ void silc_thread_pool_purge(SilcThreadPool tp)
       continue;
     }
 
+    /* Signal the thread to stop */
     t->stop = TRUE;
+    silc_cond_signal(t->thread_signal);
     silc_mutex_unlock(t->lock);
 
     silc_list_del(tp->free_threads, t);
@@ -529,9 +579,30 @@ void silc_thread_pool_purge(SilcThreadPool tp)
       break;
   }
 
-  /* Signal threads to stop */
-  silc_cond_broadcast(tp->pool_signal);
-
   silc_list_start(tp->threads);
   silc_mutex_unlock(tp->lock);
 }
+
+/*************************** Thread-local Storage ***************************/
+
+void silc_thread_tls_set(void *context)
+{
+  SilcTls tls = silc_thread_get_tls();
+
+  if (!tls) {
+    /* Initialize Tls for this thread */
+    tls = silc_thread_tls_init();
+    if (!tls)
+      return;
+  }
+
+  tls->thread_context = context;
+}
+
+void *silc_thread_tls_get(void)
+{
+  SilcTls tls = silc_thread_get_tls();
+  if (!tls)
+    return NULL;
+  return tls->thread_context;
+}