Added SILC Thread Queue API
[silc.git] / lib / silcutil / silcthread.c
index 23bd1f4ce38421d9733ea429cee5369e6030e8db..8e51a15d13ef4c182382cf029f3b0f41512c3deb 100644 (file)
 
 #include "silc.h"
 
+/***************************** Thread Pool API *****************************/
+
+/* Explanation of the thread pool execution.
+
+   When new call is added to thread pool by calling silc_thread_pool_run
+   it is assigned to a first free thread from the free list.  If no threads
+   are available we take one from the threads list and assign the call to
+   its queue.  The threads list always takes different thread finally wrapping
+   from the beginning.  This way each thread will get a chance to execute
+   queued calls.
+
+   The thread function silc_thread_pool_run_thread executes each call.  After
+   executing the current call that has been assigned to it, it will check
+   if there are any queued calls in its queue, and it will execute all calls
+   from the queue.  If there aren't any calls in the queue, it will attempt
+   to steal a call from some other thread and execute it.
+
+   The queue list is always consumed in last-in-first-out order.  The most
+   recently added call gets priority.  With full utilization this helps to
+   avoid CPU cache misses.  Since the queues are thread specific with full
+   utilization each thread should always be doing work for the most recent
+   (and thus most important) calls. */
+
 /************************** Types and definitions ***************************/
 
-/* Thread pool thread context */
+/* Thread pool thread context.  Each thread contains the most current call
+   to be executed, and a list of queued calls. */
 typedef struct SilcThreadPoolThreadStruct {
   struct SilcThreadPoolThreadStruct *next;
   struct SilcThreadPoolThreadStruct *next2;
   SilcThreadPool tp;               /* The thread pool */
-  SilcSchedule schedule;           /* Scheduler, may be NULL */
-  SilcThreadPoolFunc run;          /* The function to run in a thread */
-  SilcTaskCallback completion;     /* Completion function */
+  SilcCond thread_signal;           /* Condition variable for signalling */
+  SilcMutex lock;                  /* Thread lock */
+  SilcList queue;                  /* Queue for waiting calls */
+  SilcList free_queue;             /* Queue freelist */
+  SilcSchedule schedule;           /* The current Scheduler, may be NULL */
+  SilcThreadPoolFunc run;          /* The current call to run in a thread */
+  SilcTaskCallback completion;     /* The current Completion function */
   void *run_context;
   void *completion_context;
   unsigned int stop        : 1;            /* Set to stop the thread */
@@ -37,11 +65,10 @@ typedef struct SilcThreadPoolThreadStruct {
 /* Thread pool context */
 struct SilcThreadPoolStruct {
   SilcStack stack;                 /* Stack for memory allocation */
-  SilcMutex lock;                  /* Pool lock */
   SilcCond pool_signal;                    /* Condition variable for signalling */
+  SilcMutex lock;                  /* Pool lock */
   SilcList threads;                /* Threads in the pool */
   SilcList free_threads;           /* Threads freelist */
-  SilcList queue;                  /* Queue for waiting calls */
   SilcUInt16 min_threads;          /* Minimum threads in the pool */
   SilcUInt16 max_threads;          /* Maximum threads in the pool */
   SilcUInt16 refcnt;               /* Reference counter */
@@ -83,35 +110,24 @@ static void silc_thread_pool_unref(SilcThreadPool tp)
 
 static void *silc_thread_pool_run_thread(void *context)
 {
-  SilcThreadPoolThread t = context, q;
+  SilcThreadPoolThread t = context, o, q;
   SilcThreadPool tp = t->tp;
-  SilcMutex lock = tp->lock;
-  SilcCond pool_signal = tp->pool_signal;
+  SilcMutex lock = t->lock;
+  SilcCond thread_signal = t->thread_signal;
 
   silc_mutex_lock(lock);
 
   while (1) {
     /* Wait here for code to execute */
     while (!t->run && !t->stop)
-      silc_cond_wait(pool_signal, lock);
-
-    if (t->stop) {
-      /* Stop the thread.  Remove from threads list and free memory. */
-      SILC_LOG_DEBUG(("Stop thread %p", t));
-      silc_list_del(tp->threads, t);
-      silc_sfree(tp->stack, t);
+      silc_cond_wait(thread_signal, lock);
 
-      /* If we are last thread, signal the waiting destructor. */
-      if (silc_list_count(tp->threads) == 0)
-       silc_cond_broadcast(pool_signal);
-
-      /* Release pool reference.  Releases lock also. */
-      silc_thread_pool_unref(tp);
-      break;
-    }
-    silc_mutex_unlock(lock);
+    if (t->stop)
+      goto stop;
 
     /* Execute code */
+    silc_mutex_unlock(lock);
+  execute:
     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
                    t->run_context, t));
     t->run(t->schedule, t->run_context);
@@ -135,11 +151,15 @@ static void *silc_thread_pool_run_thread(void *context)
     }
 
     silc_mutex_lock(lock);
+    if (t->stop)
+      goto stop;
 
-    /* Check if there are calls in queue */
-    if (silc_list_count(tp->queue) > 0) {
-      silc_list_start(tp->queue);
-      q = silc_list_get(tp->queue);
+    /* Check if there are calls in queue.  Takes the most recently added
+       call since new ones are added at the start of the list. */
+    if (silc_list_count(t->queue) > 0) {
+    execute_queue:
+      silc_list_start(t->queue);
+      q = silc_list_get(t->queue);
 
       SILC_LOG_DEBUG(("Execute call from queue"));
 
@@ -150,9 +170,60 @@ static void *silc_thread_pool_run_thread(void *context)
       t->completion_context = q->completion_context;
       t->schedule = q->schedule;
 
-      silc_list_del(tp->queue, q);
-      silc_sfree(tp->stack, q);
-      continue;
+      silc_list_del(t->queue, q);
+      silc_list_add(t->free_queue, q);
+      silc_mutex_unlock(lock);
+      goto execute;
+    }
+
+    silc_mutex_unlock(lock);
+    silc_mutex_lock(tp->lock);
+
+    /* Nothing to do.  Attempt to steal call from some other thread. */
+    o = silc_list_get(tp->threads);
+    if (!o) {
+      /* List wraps around */
+      silc_list_start(tp->threads);
+      o = silc_list_get(tp->threads);
+    }
+
+    /* Check that the other thread is valid and has something to execute. */
+    silc_mutex_lock(o->lock);
+    if (o == t || o->stop || silc_list_count(o->queue) == 0) {
+      silc_mutex_unlock(o->lock);
+      o = NULL;
+    }
+
+    if (o) {
+      silc_mutex_unlock(tp->lock);
+      silc_list_start(o->queue);
+      q = silc_list_get(o->queue);
+
+      SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
+
+      /* Execute this call now */
+      t->run = q->run;
+      t->run_context = q->run_context;
+      t->completion = q->completion;
+      t->completion_context = q->completion_context;
+      t->schedule = q->schedule;
+
+      silc_list_del(o->queue, q);
+      silc_list_add(o->free_queue, q);
+      silc_mutex_unlock(o->lock);
+      goto execute;
+    }
+
+    silc_mutex_lock(lock);
+    if (t->stop) {
+      silc_mutex_unlock(tp->lock);
+      goto stop;
+    }
+
+    /* Now that we have the lock back, check the queue again. */
+    if (silc_list_count(t->queue) > 0) {
+      silc_mutex_unlock(tp->lock);
+      goto execute_queue;
     }
 
     /* The thread is now free for use again. */
@@ -160,8 +231,41 @@ static void *silc_thread_pool_run_thread(void *context)
     t->completion = NULL;
     t->schedule = NULL;
     silc_list_add(tp->free_threads, t);
+    silc_mutex_unlock(tp->lock);
   }
 
+ stop:
+  /* Stop the thread.  Remove from threads list. */
+  SILC_LOG_DEBUG(("Stop thread %p", t));
+
+  /* We can unlock the thread now.  After we get the thread pool lock
+     no one can retrieve the thread anymore. */
+  silc_mutex_unlock(lock);
+  silc_mutex_lock(tp->lock);
+
+  silc_list_del(tp->threads, t);
+  silc_list_start(tp->threads);
+
+  /* Clear thread's call queue. */
+  silc_list_start(t->queue);
+  silc_list_start(t->free_queue);
+  while ((q = silc_list_get(t->queue)))
+    silc_sfree(tp->stack, q);
+  while ((q = silc_list_get(t->free_queue)))
+    silc_sfree(tp->stack, q);
+
+  /* Destroy the thread */
+  silc_mutex_free(lock);
+  silc_cond_free(thread_signal);
+  silc_sfree(tp->stack, t);
+
+  /* If we are last thread, signal the waiting destructor. */
+  if (silc_list_count(tp->threads) == 0)
+    silc_cond_signal(tp->pool_signal);
+
+  /* Release pool reference.  Releases lock also. */
+  silc_thread_pool_unref(tp);
+
   return NULL;
 }
 
@@ -174,7 +278,23 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
   t = silc_scalloc(tp->stack, 1, sizeof(*t));
   if (!t)
     return NULL;
+
+  if (!silc_mutex_alloc(&t->lock)) {
+    silc_sfree(tp->stack, t);
+    return NULL;
+  }
+
+  if (!silc_cond_alloc(&t->thread_signal)) {
+    silc_mutex_free(t->lock);
+    silc_sfree(tp->stack, t);
+    return NULL;
+  }
+
   t->tp = tp;
+  silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
+  silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
+
+  /* Add to thread pool */
   silc_list_add(tp->threads, t);
   silc_list_add(tp->free_threads, t);
   silc_thread_pool_ref(tp);
@@ -199,8 +319,16 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
   SilcThreadPool tp;
   int i;
 
-  if (max_threads < min_threads)
+  if (max_threads < min_threads) {
+    silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT,
+                         "Max threads is smaller than min threads (%d < %d)",
+                         max_threads, min_threads);
     return NULL;
+  }
+  if (!max_threads) {
+    silc_set_errno_reason(SILC_ERR_INVALID_ARGUMENT, "Max threads is 0");
+    return NULL;
+  }
 
   if (stack)
     stack = silc_stack_alloc(0, stack);
@@ -234,11 +362,12 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
 
   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
-  silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
 
   for (i = 0; i < tp->min_threads && start_min_threads; i++)
     silc_thread_pool_new_thread(tp);
 
+  silc_list_start(tp->threads);
+
   return tp;
 }
 
@@ -255,9 +384,12 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
 
   /* Stop threads */
   silc_list_start(tp->threads);
-  while ((t = silc_list_get(tp->threads)))
+  while ((t = silc_list_get(tp->threads))) {
+    silc_mutex_lock(t->lock);
     t->stop = TRUE;
-  silc_cond_broadcast(tp->pool_signal);
+    silc_cond_signal(t->thread_signal);
+    silc_mutex_unlock(t->lock);
+  }
 
   if (wait_unfinished) {
     SILC_LOG_DEBUG(("Wait threads to finish"));
@@ -265,12 +397,6 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
       silc_cond_wait(tp->pool_signal, tp->lock);
   }
 
-  /* Free calls from queue */
-  silc_list_start(tp->queue);
-  while ((t = silc_list_get(tp->queue)))
-    silc_sfree(tp->stack, t);
-  silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
-
   /* Release reference.  Releases lock also. */
   silc_thread_pool_unref(tp);
 }
@@ -285,43 +411,65 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
                              SilcTaskCallback completion,
                              void *completion_context)
 {
-  SilcThreadPoolThread t;
+  SilcThreadPoolThread t, q;
 
   silc_mutex_lock(tp->lock);
 
   if (tp->destroy) {
     silc_mutex_unlock(tp->lock);
+    silc_set_errno(SILC_ERR_NOT_VALID);
     return FALSE;
   }
 
   /* Get free thread */
   silc_list_start(tp->free_threads);
   t = silc_list_get(tp->free_threads);
-  if (!t) {
+  if (!t || t->stop) {
     if (silc_list_count(tp->threads) + 1 > tp->max_threads) {
       /* Maximum threads reached */
       if (!queuable) {
        silc_mutex_unlock(tp->lock);
+       silc_set_errno(SILC_ERR_LIMIT);
        return FALSE;
       }
 
-      SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
-
-      /* User wants to queue this call until thread becomes free */
-      t = silc_scalloc(tp->stack, 1, sizeof(*t));
+      /* User wants to queue this call until thread becomes free.  Get
+        a thread to assign this call. */
+      t = silc_list_get(tp->threads);
       if (!t) {
-       silc_mutex_unlock(tp->lock);
-       return FALSE;
+       /* List wraps around */
+       silc_list_start(tp->threads);
+       t = silc_list_get(tp->threads);
       }
+      silc_mutex_unlock(tp->lock);
 
-      t->run = run;
-      t->run_context = run_context;
-      t->completion = completion;
-      t->completion_context = completion_context;
-      t->schedule = schedule;
+      SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
+                     run, run_context, t));
 
-      silc_list_add(tp->queue, t);
-      silc_mutex_unlock(tp->lock);
+      silc_mutex_lock(t->lock);
+
+      /* Get free call context from the list */
+      silc_list_start(t->free_queue);
+      q = silc_list_get(t->free_queue);
+      if (!q) {
+       q = silc_scalloc(tp->stack, 1, sizeof(*q));
+       if (!q) {
+         silc_mutex_unlock(t->lock);
+         return FALSE;
+       }
+      } else {
+       silc_list_del(t->free_queue, q);
+      }
+
+      q->run = run;
+      q->run_context = run_context;
+      q->completion = completion;
+      q->completion_context = completion_context;
+      q->schedule = schedule;
+
+      /* Add at the start of the list.  It gets executed first. */
+      silc_list_insert(t->queue, NULL, q);
+      silc_mutex_unlock(t->lock);
       return TRUE;
     } else {
       /* Create new thread */
@@ -333,20 +481,24 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
     }
   }
 
+  silc_list_del(tp->free_threads, t);
+  silc_mutex_unlock(tp->lock);
+
   SILC_LOG_DEBUG(("Run call %p, context %p, thread %p", run, run_context, t));
 
+  silc_mutex_lock(t->lock);
+
   /* Mark this call to be executed in this thread */
   t->run = run;
   t->run_context = run_context;
   t->completion = completion;
   t->completion_context = completion_context;
   t->schedule = schedule;
-  silc_list_del(tp->free_threads, t);
 
-  /* Signal threads */
-  silc_cond_broadcast(tp->pool_signal);
+  /* Signal the thread */
+  silc_cond_signal(t->thread_signal);
+  silc_mutex_unlock(t->lock);
 
-  silc_mutex_unlock(tp->lock);
   return TRUE;
 }
 
@@ -409,10 +561,17 @@ void silc_thread_pool_purge(SilcThreadPool tp)
 
   silc_list_start(tp->threads);
   while ((t = silc_list_get(tp->threads))) {
-    if (t->run)
+    silc_mutex_lock(t->lock);
+    if (t->run) {
+      silc_mutex_unlock(t->lock);
       continue;
+    }
 
+    /* Signal the thread to stop */
     t->stop = TRUE;
+    silc_cond_signal(t->thread_signal);
+    silc_mutex_unlock(t->lock);
+
     silc_list_del(tp->free_threads, t);
 
     i--;
@@ -420,8 +579,30 @@ void silc_thread_pool_purge(SilcThreadPool tp)
       break;
   }
 
-  /* Signal threads to stop */
-  silc_cond_broadcast(tp->pool_signal);
-
+  silc_list_start(tp->threads);
   silc_mutex_unlock(tp->lock);
 }
+
+/*************************** Thread-local Storage ***************************/
+
+void silc_thread_tls_set(void *context)
+{
+  SilcTls tls = silc_thread_get_tls();
+
+  if (!tls) {
+    /* Initialize Tls for this thread */
+    tls = silc_thread_tls_init();
+    if (!tls)
+      return;
+  }
+
+  tls->thread_context = context;
+}
+
+void *silc_thread_tls_get(void)
+{
+  SilcTls tls = silc_thread_get_tls();
+  if (!tls)
+    return NULL;
+  return tls->thread_context;
+}