Major rewrite of the SilcThreadQueue API
authorPekka Riikonen <priikone@silcnet.org>
Sat, 23 Feb 2008 13:46:52 +0000 (15:46 +0200)
committerPekka Riikonen <priikone@silcnet.org>
Sat, 23 Feb 2008 13:46:52 +0000 (15:46 +0200)
The new API allows creation of queues with multiple pipes.  Each pipe
is independently lockable and waitable.  A queue with one pipe is same
as the old SilcThreadQueue.

The queue now also supports FIFO order when popping data from the pipes.
By default the order is LIFO.

TODO
lib/silcutil/silcthreadqueue.c
lib/silcutil/silcthreadqueue.h
lib/silcutil/tests/test_silcthreadqueue.c

diff --git a/TODO b/TODO
index 7f116c959882044a3a3b4dea4b3cf395b1963a39..e907469ae69eee172f41a3210a49bdfafb44902e 100644 (file)
--- a/TODO
+++ b/TODO
@@ -164,10 +164,21 @@ Runtime library, lib/silcutil/
    rwlock implementation using atomic operations.) not for now.
 
 
    rwlock implementation using atomic operations.) not for now.
 
 
-lib/silcutil/symbian/
-=====================
+Windows Support
+===============
+
+
+Symbian OS Support
+==================
 
  o Something needs to be thought to the logging globals as well,
    like silc_debug etc.  They won't work on EPOC.  Perhaps logging
    and debugging is to be disabled on EPOC.  The logging currently works
 
  o Something needs to be thought to the logging globals as well,
    like silc_debug etc.  They won't work on EPOC.  Perhaps logging
    and debugging is to be disabled on EPOC.  The logging currently works
-   by it cannot be controlled, same with debugging.
+   by it cannot be controlled, same with debugging.  SILC Global API
+   MUST be used with all globals on Symbian.
+
+ o gethostname() returns "Function not implemented".  Others may return
+   the same.  We should fix that probably to use RHostResolver and
+   GetHostName().
+
+ o silc_thread_exit should call User::Exit().
index 1ca9ef3b0b00094f18973fd3107bcb2afb0b6adc..324063fc9755e5273891920410362e466b9024f5 100644 (file)
 
 /************************** Types and definitions ***************************/
 
 
 /************************** Types and definitions ***************************/
 
-/* Thread queue context */
-struct SilcThreadQueueStruct {
-  SilcDList queue;             /* The queue */
+/* Queue data context */
+typedef struct SilcThreadQueueDataStruct {
+  struct SilcThreadQueueDataStruct *next;
+  void *data;                  /* User data */
+} *SilcThreadQueueData;
+
+/* Pipe */
+typedef struct SilcThreadQueuePipeStruct {
+  SilcList queue;              /* The queue */
+  SilcList freelist;           /* Free list of queue data contexts */
   SilcMutex lock;              /* Queue lock */
   SilcCond cond;               /* Condition for waiting */
   SilcMutex lock;              /* Queue lock */
   SilcCond cond;               /* Condition for waiting */
+} *SilcThreadQueuePipe;
+
+/* Thread queue context */
+struct SilcThreadQueueStruct {
+  SilcThreadQueuePipe pipes;   /* Queue pipes */
   SilcAtomic32 connected;      /* Number of connected threads */
   SilcAtomic32 connected;      /* Number of connected threads */
+  unsigned int num_pipes  : 31;        /* Number of pipes */
+  unsigned int fifo       : 1; /* FIFO */
 };
 
 /************************** SILC Thread Queue API ***************************/
 
 /* Allocate thread queue */
 
 };
 
 /************************** SILC Thread Queue API ***************************/
 
 /* Allocate thread queue */
 
-SilcThreadQueue silc_thread_queue_alloc(void)
+SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo)
 {
   SilcThreadQueue queue;
 {
   SilcThreadQueue queue;
+  SilcUInt32 i;
+
+  if (!num_pipes)
+    num_pipes = 1;
 
   queue = silc_calloc(1, sizeof(*queue));
   if (!queue)
     return NULL;
 
 
   queue = silc_calloc(1, sizeof(*queue));
   if (!queue)
     return NULL;
 
-  SILC_LOG_DEBUG(("Allocated thread queue %p", queue));
-
-  if (!silc_mutex_alloc(&queue->lock)) {
-    silc_free(queue);
-    return NULL;
-  }
+  SILC_LOG_DEBUG(("Allocated thread queue %p, %d pipes %s", queue,
+                 num_pipes, fifo ? "FIFO" : ""));
 
 
-  if (!silc_cond_alloc(&queue->cond)) {
-    silc_mutex_free(queue->lock);
+  queue->pipes = silc_calloc(num_pipes, sizeof(*queue->pipes));
+  if (!queue->pipes) {
     silc_free(queue);
     return NULL;
   }
     silc_free(queue);
     return NULL;
   }
-
-  queue->queue = silc_dlist_init();
-  if (!queue->queue) {
-    silc_cond_free(queue->cond);
-    silc_mutex_free(queue->lock);
-    silc_free(queue);
-    return NULL;
+  queue->num_pipes = num_pipes;
+  queue->fifo = fifo;
+
+  for (i = 0; i < num_pipes; i++) {
+    silc_list_init(queue->pipes[i].queue,
+                  struct SilcThreadQueueDataStruct, next);
+    silc_list_init(queue->pipes[i].freelist,
+                  struct SilcThreadQueueDataStruct, next);
+    silc_mutex_alloc(&queue->pipes[i].lock);
+    silc_cond_alloc(&queue->pipes[i].cond);
   }
 
   silc_atomic_init32(&queue->connected, 1);
   }
 
   silc_atomic_init32(&queue->connected, 1);
@@ -71,106 +88,185 @@ SilcThreadQueue silc_thread_queue_alloc(void)
 
 void silc_thread_queue_connect(SilcThreadQueue queue)
 {
 
 void silc_thread_queue_connect(SilcThreadQueue queue)
 {
+  SILC_LOG_DEBUG(("Connect to thread queue %p", queue));
   silc_atomic_add_int32(&queue->connected, 1);
 }
 
 /* Disconnect current thread from queue */
 
   silc_atomic_add_int32(&queue->connected, 1);
 }
 
 /* Disconnect current thread from queue */
 
-void silc_thread_queue_disconnect(SilcThreadQueue queue)
+SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue)
 {
 {
+  SilcUInt32 i;
+  SilcThreadQueueData data;
+
+  SILC_LOG_DEBUG(("Disconnect from thread queue %p", queue));
+
   if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
   if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
-    return;
+    return TRUE;
 
   /* Free queue */
   SILC_LOG_DEBUG(("Free thread queue %p", queue));
 
   /* Free queue */
   SILC_LOG_DEBUG(("Free thread queue %p", queue));
-  silc_cond_free(queue->cond);
-  silc_mutex_free(queue->lock);
-  silc_dlist_uninit(queue->queue);
+
+  for (i = 0; i < queue->num_pipes; i++) {
+    silc_cond_free(queue->pipes[i].cond);
+    silc_mutex_free(queue->pipes[i].lock);
+    silc_list_start(queue->pipes[i].queue);
+    while ((data = silc_list_get(queue->pipes[i].queue)))
+      silc_free(data);
+    silc_list_start(queue->pipes[i].freelist);
+    while ((data = silc_list_get(queue->pipes[i].freelist)))
+      silc_free(data);
+  }
+
+  silc_free(queue->pipes);
   silc_atomic_uninit32(&queue->connected);
   silc_free(queue);
   silc_atomic_uninit32(&queue->connected);
   silc_free(queue);
+
+  return FALSE;
 }
 
 /* Push data to queue */
 
 }
 
 /* Push data to queue */
 
-void silc_thread_queue_push(SilcThreadQueue queue, void *data)
+void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data,
+                           SilcBool demux)
 {
 {
+  SilcThreadQueueData d;
+  SilcUInt32 i;
+
   if (silc_unlikely(!data))
     return;
 
   if (silc_unlikely(!data))
     return;
 
-  SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue));
+  SILC_ASSERT(pipe_index < queue->num_pipes);
+
+  SILC_LOG_DEBUG(("Push data %p to thread queue %p, pipe %d, demux %s",
+                 data, queue, pipe_index, demux ? "yes" : "no"));
+
+  silc_mutex_lock(queue->pipes[pipe_index].lock);
+
+  d = silc_list_pop(queue->pipes[pipe_index].freelist);
+  if (!d) {
+    d = silc_calloc(1, sizeof(*d));
+    if (!d)
+      return;
+  }
+  d->data = data;
+
+  if (demux) {
+    for (i = 0; i < queue->num_pipes; i++) {
+      if (queue->fifo)
+       silc_list_add(queue->pipes[i].queue, d);
+      else
+       silc_list_insert(queue->pipes[i].queue, NULL, d);
+    }
+  } else {
+    if (queue->fifo)
+      silc_list_add(queue->pipes[pipe_index].queue, d);
+    else
+      silc_list_insert(queue->pipes[pipe_index].queue, NULL, d);
+  }
 
 
-  silc_mutex_lock(queue->lock);
-  silc_dlist_start(queue->queue);
-  silc_dlist_insert(queue->queue, data);
-  silc_cond_broadcast(queue->cond);
-  silc_mutex_unlock(queue->lock);
+  silc_cond_broadcast(queue->pipes[pipe_index].cond);
+  silc_mutex_unlock(queue->pipes[pipe_index].lock);
 }
 
 /* Get data or wait if wanted or return NULL. */
 
 }
 
 /* Get data or wait if wanted or return NULL. */
 
-void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block)
+void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
+                           SilcBool block)
 {
 {
+  SilcThreadQueueData d;
   void *data;
 
   void *data;
 
-  if (block)
-    return silc_thread_queue_timed_pop(queue, 0);
-
-  silc_mutex_lock(queue->lock);
-
-  silc_dlist_start(queue->queue);
-  data = silc_dlist_get(queue->queue);
-  if (data)
-    silc_dlist_del(queue->queue, data);
+  SILC_ASSERT(pipe_index < queue->num_pipes);
+
+  silc_mutex_lock(queue->pipes[pipe_index].lock);
+
+  if (block) {
+    /* Block */
+    while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
+      silc_cond_wait(queue->pipes[pipe_index].cond,
+                    queue->pipes[pipe_index].lock);
+    silc_list_add(queue->pipes[pipe_index].freelist, d);
+    data = d->data;
+  } else {
+    /* No blocking */
+    d = silc_list_pop(queue->pipes[pipe_index].queue);
+    data = NULL;
+    if (d) {
+      silc_list_add(queue->pipes[pipe_index].freelist, d);
+      data = d->data;
+    }
+  }
 
 
-  SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+  SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data,
+                 queue, pipe_index));
 
 
-  silc_mutex_unlock(queue->lock);
+  silc_mutex_unlock(queue->pipes[pipe_index].lock);
 
   return data;
 }
 
 /* Get data or wait for a while */
 
 
   return data;
 }
 
 /* Get data or wait for a while */
 
-void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index,
                                  int timeout_msec)
 {
                                  int timeout_msec)
 {
-  void *data;
+  SilcThreadQueueData d;
+  void *data = NULL;
+
+  SILC_ASSERT(pipe_index < queue->num_pipes);
 
 
-  silc_mutex_lock(queue->lock);
+  silc_mutex_lock(queue->pipes[pipe_index].lock);
 
 
-  silc_dlist_start(queue->queue);
-  while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) {
-    if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec))
+  while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
+    if (!silc_cond_timedwait(queue->pipes[pipe_index].cond,
+                            queue->pipes[pipe_index].lock, timeout_msec))
       break;
       break;
-    silc_dlist_start(queue->queue);
-  }
 
 
-  if (data)
-    silc_dlist_del(queue->queue, data);
+  if (d) {
+    silc_list_add(queue->pipes[pipe_index].freelist, d);
+    data = d->data;
+  }
 
 
-  SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+  SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, queue,
+                 pipe_index));
 
 
-  silc_mutex_unlock(queue->lock);
+  silc_mutex_unlock(queue->pipes[pipe_index].lock);
 
   return data;
 }
 
 /* Pop entire queue */
 
 
   return data;
 }
 
 /* Pop entire queue */
 
-SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block)
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index,
+                                    SilcBool block)
 {
 {
+  SilcThreadQueueData d;
   SilcDList list;
 
   SilcDList list;
 
-  silc_mutex_lock(queue->lock);
+  SILC_ASSERT(pipe_index < queue->num_pipes);
+
+  silc_mutex_lock(queue->pipes[pipe_index].lock);
 
   if (block)
 
   if (block)
-    while (silc_dlist_count(queue->queue) == 0)
-      silc_cond_wait(queue->cond, queue->lock);
+    while (silc_list_count(queue->pipes[pipe_index].queue) == 0)
+      silc_cond_wait(queue->pipes[pipe_index].cond,
+                    queue->pipes[pipe_index].lock);
+
+  list = silc_dlist_init();
+  if (!list)
+    return NULL;
+
+  silc_list_start(queue->pipes[pipe_index].queue);
+  while ((d = silc_list_get(queue->pipes[pipe_index].queue))) {
+    silc_dlist_add(list, d->data);
+    silc_list_add(queue->pipes[pipe_index].freelist, d);
+  }
 
 
-  list = queue->queue;
-  queue->queue = silc_dlist_init();
+  silc_list_init(queue->pipes[pipe_index].queue,
+                struct SilcThreadQueueDataStruct, next);
 
 
-  silc_mutex_unlock(queue->lock);
+  silc_mutex_unlock(queue->pipes[pipe_index].lock);
 
   silc_dlist_start(list);
 
 
   silc_dlist_start(list);
 
index 87cf7445b9b2fe9ccb22d5a682ddca997c0064e5..baf0e980b7ad16ddc0d70d563f4dee3e959bcb9b 100644 (file)
  * takes the data from the queue or blocks until more data is available
  * in the queue.
  *
  * takes the data from the queue or blocks until more data is available
  * in the queue.
  *
+ * The queue itself can have one ore more pipes, allowing user to use one
+ * queue to pass different information in different pipes, if each pipe
+ * need to be dedicated to specific type of data.
+ *
  * EXAMPLE
  *
  * Thread 1:
  *
  * // Create queue and push data into it
  * EXAMPLE
  *
  * Thread 1:
  *
  * // Create queue and push data into it
- * SilcThreadQueue queue = silc_thread_queue_alloc();
- * silc_thread_queue_push(queue, data);
+ * SilcThreadQueue queue = silc_thread_queue_alloc(1, FALSE);
+ * silc_thread_queue_push(queue, 0, data, FALSE);
  *
  * Thread 2:
  *
  *
  * Thread 2:
  *
@@ -41,7 +45,7 @@
  * silc_thread_queue_connect(queue);
  *
  * // Block here until data is available from the queue
  * silc_thread_queue_connect(queue);
  *
  * // Block here until data is available from the queue
- * data = silc_thread_queue_pop(queue, TRUE);
+ * data = silc_thread_queue_pop(queue, 0, TRUE);
  *
  ***/
 
  *
  ***/
 
@@ -66,7 +70,7 @@ typedef struct SilcThreadQueueStruct *SilcThreadQueue;
  *
  * SYNOPSIS
  *
  *
  * SYNOPSIS
  *
- *    SilcThreadQueue silc_thread_queue_alloc(void);
+ *    SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo);
  *
  * DESCRIPTION
  *
  *
  * DESCRIPTION
  *
@@ -76,8 +80,20 @@ typedef struct SilcThreadQueueStruct *SilcThreadQueue;
  *    queue it must first connect to it by calling silc_thread_queue_connect.
  *    The thread that creates the queue automatically connects to the queue.
  *
  *    queue it must first connect to it by calling silc_thread_queue_connect.
  *    The thread that creates the queue automatically connects to the queue.
  *
+ *    The 'num_pipes' specifies the number of pipes that exist in the queue.
+ *    If `num_pipes' is 0, the 0 is ignored and one pipe is created anyway.
+ *    By default, caller should create one pipe, unless more are needed.
+ *    The pipes are referenced by index.  First pipe has index 0, second
+ *    index 1, and so on.  The index is given as argument when pushing
+ *    and popping from the queue.
+ *
+ *    By default data popped from the queue is done in last-in-first-out
+ *    order; the most recently added data is popped first.  If `fifo' is
+ *    set to TRUE the order is first-in-first-out; the first added data is
+ *    popped first.
+ *
  ***/
  ***/
-SilcThreadQueue silc_thread_queue_alloc(void);
+SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo);
 
 /****f* silcutil/silc_thread_queue_connect
  *
 
 /****f* silcutil/silc_thread_queue_connect
  *
@@ -99,7 +115,7 @@ void silc_thread_queue_connect(SilcThreadQueue queue);
  *
  * SYNOPSIS
  *
  *
  * SYNOPSIS
  *
- *    void silc_thread_queue_disconnect(SilcThreadQueue queue);
+ *    SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue);
  *
  * DESCRIPTION
  *
  *
  * DESCRIPTION
  *
@@ -107,30 +123,41 @@ void silc_thread_queue_connect(SilcThreadQueue queue);
  *    called after the thread has finished using the thread queue.
  *
  *    When the last thread has disconnected from the queue the queue is
  *    called after the thread has finished using the thread queue.
  *
  *    When the last thread has disconnected from the queue the queue is
- *    destroyed.
+ *    destroyed and this returns FALSE.  Otherwise this returns TRUE as
+ *    long as there are threads connected to the queue.
  *
  ***/
  *
  ***/
-void silc_thread_queue_disconnect(SilcThreadQueue queue);
+SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue);
 
 /****f* silcutil/silc_thread_queue_push
  *
  * SYNOPSIS
  *
 
 /****f* silcutil/silc_thread_queue_push
  *
  * SYNOPSIS
  *
- *    void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+ *    void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index,
+ *                                void *data, SilcBool demux);
  *
  * DESCRIPTION
  *
  *    Pushes the `data' into the thread queue.  The data will become
  *
  * DESCRIPTION
  *
  *    Pushes the `data' into the thread queue.  The data will become
- *    immediately available in the queue for other threads.
+ *    immediately available in the queue for other threads.  The `pipe_index'
+ *    specifies the pipe to push the data into.  First pipe has index 0,
+ *    second has index 1, and so on.  If there is only one pipe the index
+ *    is always 0.
+ *
+ *    If the `demux' is TRUE this will perform demuxing; data pushed to one
+ *    pipe will be pushed to all pipes.  In this case the `pipe_index' is
+ *    ignored.  Each pipe will return the same data when popped.
  *
  ***/
  *
  ***/
-void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data,
+                           SilcBool demux);
 
 /****f* silcutil/silc_thread_queue_pop
  *
  * SYNOPSIS
  *
 
 /****f* silcutil/silc_thread_queue_pop
  *
  * SYNOPSIS
  *
- *    void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+ *    void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
+ *                                SilcBool block);
  *
  * DESCRIPTION
  *
  *
  * DESCRIPTION
  *
@@ -139,15 +166,20 @@ void silc_thread_queue_push(SilcThreadQueue queue, void *data);
  *    If `block' is FALSE and data is not available this will return NULL.
  *    If `block' is TRUE this will never return NULL.
  *
  *    If `block' is FALSE and data is not available this will return NULL.
  *    If `block' is TRUE this will never return NULL.
  *
+ *    The `pipe_index' specifies the pipe from which to pop the data.
+ *    First pipe has index 0, second has index 1, and so on.  If there is
+ *    only one pipe the index is always 0.
+ *
  ***/
  ***/
-void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
+                           SilcBool block);
 
 /****f* silcutil/silc_thread_queue_timed_pop
  *
  * SYNOPSIS
  *
  *    void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
 
 /****f* silcutil/silc_thread_queue_timed_pop
  *
  * SYNOPSIS
  *
  *    void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
- *                                      int timeout_msec);
+ *                                      int pipe_index, int timeout_msec);
  *
  * DESCRIPTION
  *
  *
  * DESCRIPTION
  *
@@ -155,8 +187,12 @@ void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
  *    milliseconds for the data to arrive.  If data is not available when
  *    the timeout occurrs this returns NULL.
  *
  *    milliseconds for the data to arrive.  If data is not available when
  *    the timeout occurrs this returns NULL.
  *
+ *    The `pipe_index' specifies the pipe from which to pop the data.
+ *    First pipe has index 0, second has index 1, and so on.  If there is
+ *    only one pipe the index is always 0.
+ *
  ***/
  ***/
-void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index,
                                  int timeout_msec);
 
 /****f* silcutil/silc_thread_queue_pop_list
                                  int timeout_msec);
 
 /****f* silcutil/silc_thread_queue_pop_list
@@ -164,7 +200,7 @@ void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
  * SYNOPSIS
  *
  *    SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue,
  * SYNOPSIS
  *
  *    SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue,
- *                                         SilcBool block);
+ *                                         int pipe_index, SilcBool block);
  *
  * DESCRIPTION
  *
  *
  * DESCRIPTION
  *
@@ -174,7 +210,12 @@ void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
  *    immediately.  If `block' is TRUE this will block if the queue is
  *    empty.
  *
  *    immediately.  If `block' is TRUE this will block if the queue is
  *    empty.
  *
+ *    The `pipe_index' specifies the pipe from which to pop the list.
+ *    First pipe has index 0, second has index 1, and so on.  If there is
+ *    only one pipe the index is always 0.
+ *
  ***/
  ***/
-SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block);
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index,
+                                    SilcBool block);
 
 #endif /* SILCTHREADQUEUE_H */
 
 #endif /* SILCTHREADQUEUE_H */
index daecebffc131a3d3f5f9142960ef6d2bc55c5dee..74d3902eb50ffc4fe492d42b82e41d1111ebf88c 100644 (file)
@@ -17,7 +17,7 @@ SILC_FSM_STATE(test_st_start)
 
   SILC_LOG_DEBUG(("test_st_start"));
 
 
   SILC_LOG_DEBUG(("test_st_start"));
 
-  queue = silc_thread_queue_alloc();
+  queue = silc_thread_queue_alloc(1, FALSE);
   if (!queue) {
     silc_fsm_next(fsm, test_st_finish);
     return SILC_FSM_CONTINUE;
   if (!queue) {
     silc_fsm_next(fsm, test_st_finish);
     return SILC_FSM_CONTINUE;
@@ -43,7 +43,7 @@ SILC_FSM_STATE(test_st_wait)
   SILC_LOG_DEBUG(("Wait for data"));
 
   /* Wait for data */
   SILC_LOG_DEBUG(("Wait for data"));
 
   /* Wait for data */
-  data = silc_thread_queue_pop(queue, TRUE);
+  data = silc_thread_queue_pop(queue, 0, TRUE);
   if (!data || data != (void *)100) {
     silc_fsm_next(fsm, test_st_finish);
     return SILC_FSM_CONTINUE;
   if (!data || data != (void *)100) {
     silc_fsm_next(fsm, test_st_finish);
     return SILC_FSM_CONTINUE;
@@ -62,7 +62,7 @@ SILC_FSM_STATE(test_st_thread_start)
 
   /* Send data */
   SILC_LOG_DEBUG(("Send data"));
 
   /* Send data */
   SILC_LOG_DEBUG(("Send data"));
-  silc_thread_queue_push(queue, (void *)100);
+  silc_thread_queue_push(queue, 0, (void *)100, FALSE);
 
   silc_thread_queue_disconnect(queue);
   return SILC_FSM_FINISH;
 
   silc_thread_queue_disconnect(queue);
   return SILC_FSM_FINISH;
@@ -90,6 +90,8 @@ int main(int argc, char **argv)
 {
   SilcFSM fsm;
 
 {
   SilcFSM fsm;
 
+  silc_runtime_init();
+
   if (argc > 1 && !strcmp(argv[1], "-d")) {
     silc_log_debug(TRUE);
     silc_log_debug_hexdump(TRUE);
   if (argc > 1 && !strcmp(argv[1], "-d")) {
     silc_log_debug(TRUE);
     silc_log_debug_hexdump(TRUE);
@@ -116,5 +118,7 @@ int main(int argc, char **argv)
   SILC_LOG_DEBUG(("Testing was %s", success ? "SUCCESS" : "FAILURE"));
   fprintf(stderr, "Testing was %s\n", success ? "SUCCESS" : "FAILURE");
 
   SILC_LOG_DEBUG(("Testing was %s", success ? "SUCCESS" : "FAILURE"));
   fprintf(stderr, "Testing was %s\n", success ? "SUCCESS" : "FAILURE");
 
+  silc_runtime_uninit();
+
   return !success;
 }
   return !success;
 }