Optimizations to thread pool. Changed call queues thread
authorPekka Riikonen <priikone@silcnet.org>
Thu, 26 Jul 2007 15:25:21 +0000 (15:25 +0000)
committerPekka Riikonen <priikone@silcnet.org>
Thu, 26 Jul 2007 15:25:21 +0000 (15:25 +0000)
specific and calls and executed in last-in-first-out order.

lib/silcutil/silcthread.c
lib/silcutil/silcutil.h
lib/silcutil/tests/test_silcthread.c

index d7c59690d4fce73fa74793a52bbd9dbf93fc8b67..3a89206b0ededf9c4a01005f62c232bf4516cf6f 100644 (file)
 
 #include "silc.h"
 
+/* Explanation of the thread pool execution.
+
+   When new call is added to thread pool by calling silc_thread_pool_run
+   it is assigned to a first free thread from the free list.  If no threads
+   are available we take one from the threads list and assign the call to
+   its queue.  The threads list always takes different thread finally wrapping
+   from the beginning.  This way each thread will get a chance to execute
+   queued calls.
+
+   The thread function silc_thread_pool_run_thread executes each call.  After
+   executing the current call that has been assigned to it, it will check
+   if there are any queued calls in its queue, and it will execute all calls
+   from the queue.  If there aren't any calls in the queue, it will attempt
+   to steal a call from some other thread and execute it.
+
+   The queue list is always consumed in last-in-first-out order.  The most
+   recently added call gets priority.  With full utilization this helps to
+   avoid CPU cache misses.  Since the queues are thread specific with full
+   utilization each thread should always be doing work for the most recent
+   (and thus most important) calls. */
+
 /************************** Types and definitions ***************************/
 
-/* Thread pool thread context */
+/* Thread pool thread context.  Each thread contains the most current call
+   to be executed, and a list of queued calls. */
 typedef struct SilcThreadPoolThreadStruct {
   struct SilcThreadPoolThreadStruct *next;
   struct SilcThreadPoolThreadStruct *next2;
   SilcThreadPool tp;               /* The thread pool */
-  SilcSchedule schedule;           /* Scheduler, may be NULL */
-  SilcThreadPoolFunc run;          /* The function to run in a thread */
-  SilcTaskCallback completion;     /* Completion function */
+  SilcMutex lock;                  /* Thread lock */
+  SilcList queue;                  /* Queue for waiting calls */
+  SilcList free_queue;             /* Queue freelist */
+  SilcSchedule schedule;           /* The current Scheduler, may be NULL */
+  SilcThreadPoolFunc run;          /* The current call to run in a thread */
+  SilcTaskCallback completion;     /* The current Completion function */
   void *run_context;
   void *completion_context;
   unsigned int stop        : 1;            /* Set to stop the thread */
@@ -41,8 +66,6 @@ struct SilcThreadPoolStruct {
   SilcCond pool_signal;                    /* Condition variable for signalling */
   SilcList threads;                /* Threads in the pool */
   SilcList free_threads;           /* Threads freelist */
-  SilcList queue;                  /* Queue for waiting calls */
-  SilcList free_queue;             /* Queue freelist */
   SilcUInt16 min_threads;          /* Minimum threads in the pool */
   SilcUInt16 max_threads;          /* Maximum threads in the pool */
   SilcUInt16 refcnt;               /* Reference counter */
@@ -84,7 +107,7 @@ static void silc_thread_pool_unref(SilcThreadPool tp)
 
 static void *silc_thread_pool_run_thread(void *context)
 {
-  SilcThreadPoolThread t = context, q;
+  SilcThreadPoolThread t = context, o, q;
   SilcThreadPool tp = t->tp;
   SilcMutex lock = tp->lock;
   SilcCond pool_signal = tp->pool_signal;
@@ -96,15 +119,26 @@ static void *silc_thread_pool_run_thread(void *context)
     while (!t->run && !t->stop)
       silc_cond_wait(pool_signal, lock);
 
-    if (t->stop) {
+    if (silc_unlikely(t->stop)) {
       /* Stop the thread.  Remove from threads list and free memory. */
       SILC_LOG_DEBUG(("Stop thread %p", t));
       silc_list_del(tp->threads, t);
+      silc_list_start(tp->threads);
+
+      /* Clear thread's call queue. */
+      silc_list_start(t->queue);
+      silc_list_start(t->free_queue);
+      while ((q = silc_list_get(t->queue)))
+       silc_sfree(tp->stack, q);
+      while ((q = silc_list_get(t->free_queue)))
+       silc_sfree(tp->stack, q);
+
+      silc_mutex_free(t->lock);
       silc_sfree(tp->stack, t);
 
       /* If we are last thread, signal the waiting destructor. */
       if (silc_list_count(tp->threads) == 0)
-       silc_cond_broadcast(pool_signal);
+       silc_cond_signal(pool_signal);
 
       /* Release pool reference.  Releases lock also. */
       silc_thread_pool_unref(tp);
@@ -113,6 +147,7 @@ static void *silc_thread_pool_run_thread(void *context)
     silc_mutex_unlock(lock);
 
     /* Execute code */
+  execute:
     SILC_LOG_DEBUG(("Execute call %p, context %p, thread %p", t->run,
                    t->run_context, t));
     t->run(t->schedule, t->run_context);
@@ -135,12 +170,12 @@ static void *silc_thread_pool_run_thread(void *context)
       }
     }
 
-    silc_mutex_lock(lock);
-
-    /* Check if there are calls in queue */
-    if (silc_list_count(tp->queue) > 0) {
-      silc_list_start(tp->queue);
-      q = silc_list_get(tp->queue);
+    /* Check if there are calls in queue.  Takes the most recently added
+       call since new ones are added at the start of the list. */
+    silc_mutex_lock(t->lock);
+    if (silc_list_count(t->queue) > 0) {
+      silc_list_start(t->queue);
+      q = silc_list_get(t->queue);
 
       SILC_LOG_DEBUG(("Execute call from queue"));
 
@@ -151,15 +186,53 @@ static void *silc_thread_pool_run_thread(void *context)
       t->completion_context = q->completion_context;
       t->schedule = q->schedule;
 
-      silc_list_del(tp->queue, q);
-      silc_list_add(tp->free_queue, q);
-      continue;
+      silc_list_del(t->queue, q);
+      silc_list_add(t->free_queue, q);
+      silc_mutex_unlock(t->lock);
+      goto execute;
+    } else {
+      silc_mutex_unlock(t->lock);
+
+      /* Nothing to do.  Attempt to steal call from some other thread. */
+      silc_mutex_lock(lock);
+      o = silc_list_get(tp->threads);
+      if (!o) {
+       /* List wraps around */
+       silc_list_start(tp->threads);
+       o = silc_list_get(tp->threads);
+      }
+      silc_mutex_unlock(lock);
+
+      if (o && o != t) {
+       silc_mutex_lock(o->lock);
+       if (silc_list_count(o->queue) > 0) {
+         silc_list_start(o->queue);
+         q = silc_list_get(o->queue);
+
+         SILC_LOG_DEBUG(("Execute call from queue from thread %p", o));
+
+         /* Execute this call now */
+         t->run = q->run;
+         t->run_context = q->run_context;
+         t->completion = q->completion;
+         t->completion_context = q->completion_context;
+         t->schedule = q->schedule;
+
+         silc_list_del(o->queue, q);
+         silc_list_add(o->free_queue, q);
+         silc_mutex_unlock(o->lock);
+         goto execute;
+       }
+       silc_mutex_unlock(o->lock);
+      }
     }
 
     /* The thread is now free for use again. */
     t->run = NULL;
     t->completion = NULL;
     t->schedule = NULL;
+
+    silc_mutex_lock(lock);
     silc_list_add(tp->free_threads, t);
   }
 
@@ -175,7 +248,17 @@ static SilcThreadPoolThread silc_thread_pool_new_thread(SilcThreadPool tp)
   t = silc_scalloc(tp->stack, 1, sizeof(*t));
   if (!t)
     return NULL;
+
+  if (!silc_mutex_alloc(&t->lock)) {
+    silc_sfree(tp->stack, t);
+    return NULL;
+  }
+
   t->tp = tp;
+  silc_list_init(t->queue, struct SilcThreadPoolThreadStruct, next);
+  silc_list_init(t->free_queue, struct SilcThreadPoolThreadStruct, next);
+
+  /* Add to thread pool */
   silc_list_add(tp->threads, t);
   silc_list_add(tp->free_threads, t);
   silc_thread_pool_ref(tp);
@@ -202,6 +285,8 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
 
   if (max_threads < min_threads)
     return NULL;
+  if (!max_threads)
+    return NULL;
 
   if (stack)
     stack = silc_stack_alloc(0, stack);
@@ -235,12 +320,12 @@ SilcThreadPool silc_thread_pool_alloc(SilcStack stack,
 
   silc_list_init(tp->threads, struct SilcThreadPoolThreadStruct, next);
   silc_list_init(tp->free_threads, struct SilcThreadPoolThreadStruct, next2);
-  silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
-  silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
 
   for (i = 0; i < tp->min_threads && start_min_threads; i++)
     silc_thread_pool_new_thread(tp);
 
+  silc_list_start(tp->threads);
+
   return tp;
 }
 
@@ -267,16 +352,6 @@ void silc_thread_pool_free(SilcThreadPool tp, SilcBool wait_unfinished)
       silc_cond_wait(tp->pool_signal, tp->lock);
   }
 
-  /* Free calls from queue */
-  silc_list_start(tp->queue);
-  while ((t = silc_list_get(tp->queue)))
-    silc_sfree(tp->stack, t);
-  silc_list_start(tp->free_queue);
-  while ((t = silc_list_get(tp->free_queue)))
-    silc_sfree(tp->stack, t);
-  silc_list_init(tp->queue, struct SilcThreadPoolThreadStruct, next);
-  silc_list_init(tp->free_queue, struct SilcThreadPoolThreadStruct, next);
-
   /* Release reference.  Releases lock also. */
   silc_thread_pool_unref(tp);
 }
@@ -291,7 +366,7 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
                              SilcTaskCallback completion,
                              void *completion_context)
 {
-  SilcThreadPoolThread t;
+  SilcThreadPoolThread t, q;
 
   silc_mutex_lock(tp->lock);
 
@@ -311,28 +386,45 @@ SilcBool silc_thread_pool_run(SilcThreadPool tp,
        return FALSE;
       }
 
-      SILC_LOG_DEBUG(("Queue call %p, context %p", run, run_context));
-
-      /* User wants to queue this call until thread becomes free */
-      silc_list_start(tp->free_queue);
-      t = silc_list_get(tp->free_queue);
+      /* User wants to queue this call until thread becomes free.  Get
+        a thread to assign this call. */
+      t = silc_list_get(tp->threads);
       if (!t) {
-       t = silc_scalloc(tp->stack, 1, sizeof(*t));
-       if (!t) {
+       /* List wraps around */
+       silc_list_start(tp->threads);
+       t = silc_list_get(tp->threads);
+      }
+
+      SILC_LOG_DEBUG(("Queue call %p, context %p in thread %p",
+                     run, run_context, t));
+
+      /* Lock the thread.  Keep also thread pool lock so that this thread
+        cannot become free while we're adding call to its queue. */
+      silc_mutex_lock(t->lock);
+
+      /* Get free call context from the list */
+      silc_list_start(t->free_queue);
+      q = silc_list_get(t->free_queue);
+      if (!q) {
+       q = silc_scalloc(tp->stack, 1, sizeof(*q));
+       if (!q) {
+         silc_mutex_unlock(t->lock);
          silc_mutex_unlock(tp->lock);
          return FALSE;
        }
       } else {
-       silc_list_del(tp->free_queue, t);
+       silc_list_del(t->free_queue, q);
       }
 
-      t->run = run;
-      t->run_context = run_context;
-      t->completion = completion;
-      t->completion_context = completion_context;
-      t->schedule = schedule;
+      q->run = run;
+      q->run_context = run_context;
+      q->completion = completion;
+      q->completion_context = completion_context;
+      q->schedule = schedule;
 
-      silc_list_add(tp->queue, t);
+      /* Add at the start of the list.  It gets executed first. */
+      silc_list_insert(t->queue, NULL, q);
+      silc_mutex_unlock(t->lock);
       silc_mutex_unlock(tp->lock);
       return TRUE;
     } else {
@@ -421,10 +513,15 @@ void silc_thread_pool_purge(SilcThreadPool tp)
 
   silc_list_start(tp->threads);
   while ((t = silc_list_get(tp->threads))) {
-    if (t->run)
+    silc_mutex_lock(t->lock);
+    if (t->run) {
+      silc_mutex_unlock(t->lock);
       continue;
+    }
 
     t->stop = TRUE;
+    silc_mutex_unlock(t->lock);
+
     silc_list_del(tp->free_threads, t);
 
     i--;
@@ -435,5 +532,6 @@ void silc_thread_pool_purge(SilcThreadPool tp)
   /* Signal threads to stop */
   silc_cond_broadcast(tp->pool_signal);
 
+  silc_list_start(tp->threads);
   silc_mutex_unlock(tp->lock);
 }
index 300c0a43ee8a22c8b5ef926da50a5d7cf6daf74b..e62e768d15d0bede90cb82b3f191969e0ea07bde 100644 (file)
@@ -156,8 +156,7 @@ char *silc_format(char *fmt, ...);
  *
  *    Basic has function to hash strings. May be used with the SilcHashTable.
  *    Note that this lowers the characters of the string (with tolower()) so
- *    this is used usually with nicknames, channel and server names to provide
- *    case insensitive keys.
+ *    this can be used to provide case-insensitive hashing.
  *
  ***/
 SilcUInt32 silc_hash_string(void *key, void *user_context);
@@ -251,8 +250,8 @@ SilcUInt32 silc_hash_data(void *key, void *user_context);
  *
  * DESCRIPTION
  *
- *    Compares two strings. It may be used as SilcHashTable comparison
- *    function.
+ *    Compares two strings. This ignores the case while comparing.  It may
+ *    be used as SilcHashTable comparison function.
  *
  ***/
 SilcBool silc_hash_string_compare(void *key1, void *key2, void *user_context);
index 6305d7c100b2eede7934e1b81fe12d2cdfa54d87..d59ed4e043066bea2b2a6e3e131e8def680aa8db 100644 (file)
@@ -46,13 +46,13 @@ int main(int argc, char **argv)
   tp = silc_thread_pool_alloc(NULL, 0, 2, FALSE);
   if (!tp)
     goto err;
-  for (i = 0; i < 4; i++) {
+  for (i = 0; i < 6; i++) {
     SILC_LOG_DEBUG(("Run thread %d", i + 1));
     if (!silc_thread_pool_run(tp, TRUE, NULL, func, (void *) i + 1,
                              compl, (void *)i + 1))
       goto err;
   }
-  sleep(4);
+  sleep(6);
   SILC_LOG_DEBUG(("Stop thread pool"));
   silc_thread_pool_free(tp, TRUE);