Added SILC Thread Queue API
authorPekka Riikonen <priikone@silcnet.org>
Fri, 18 Jan 2008 14:46:32 +0000 (14:46 +0000)
committerPekka Riikonen <priikone@silcnet.org>
Fri, 18 Jan 2008 14:46:32 +0000 (14:46 +0000)
CHANGES.RUNTIME
lib/silcutil/silcthreadqueue.c [new file with mode: 0644]
lib/silcutil/silcthreadqueue.h [new file with mode: 0644]
lib/silcutil/tests/test_silcthreadqueue.c [new file with mode: 0644]

index e916899844116e371b79547e70b01106981df8d8..d62f668f5e9d20e3a654f3c9b4f471706bdf8094 100644 (file)
@@ -1,3 +1,7 @@
+Thu Jan 17 16:40:49 EET 2008  Pekka Riikonen <priikone@silcnet.org>
+
+       * Added SILC Thread Queue API to lib/silcutil/silcthreadqueue.[ch].
+
 Tue Jan 15 19:44:36 EET 2008  Pekka Riikonen <priikone@silcnet.org>
 
        * Added SILC Dir API to lib/silcutil/silcdir.h.  Implemented it
diff --git a/lib/silcutil/silcthreadqueue.c b/lib/silcutil/silcthreadqueue.c
new file mode 100644 (file)
index 0000000..1ca9ef3
--- /dev/null
@@ -0,0 +1,178 @@
+/*
+
+  silcthreadqueue.c
+
+  Author: Pekka Riikonen <priikone@silcnet.org>
+
+  Copyright (C) 2008 Pekka Riikonen
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; version 2 of the License.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+*/
+
+#include "silcruntime.h"
+
+/************************** Types and definitions ***************************/
+
+/* Thread queue context */
+struct SilcThreadQueueStruct {
+  SilcDList queue;             /* The queue */
+  SilcMutex lock;              /* Queue lock */
+  SilcCond cond;               /* Condition for waiting */
+  SilcAtomic32 connected;      /* Number of connected threads */
+};
+
+/************************** SILC Thread Queue API ***************************/
+
+/* Allocate thread queue */
+
+SilcThreadQueue silc_thread_queue_alloc(void)
+{
+  SilcThreadQueue queue;
+
+  queue = silc_calloc(1, sizeof(*queue));
+  if (!queue)
+    return NULL;
+
+  SILC_LOG_DEBUG(("Allocated thread queue %p", queue));
+
+  if (!silc_mutex_alloc(&queue->lock)) {
+    silc_free(queue);
+    return NULL;
+  }
+
+  if (!silc_cond_alloc(&queue->cond)) {
+    silc_mutex_free(queue->lock);
+    silc_free(queue);
+    return NULL;
+  }
+
+  queue->queue = silc_dlist_init();
+  if (!queue->queue) {
+    silc_cond_free(queue->cond);
+    silc_mutex_free(queue->lock);
+    silc_free(queue);
+    return NULL;
+  }
+
+  silc_atomic_init32(&queue->connected, 1);
+
+  return queue;
+}
+
+/* Connect current thread to queue */
+
+void silc_thread_queue_connect(SilcThreadQueue queue)
+{
+  silc_atomic_add_int32(&queue->connected, 1);
+}
+
+/* Disconnect current thread from queue */
+
+void silc_thread_queue_disconnect(SilcThreadQueue queue)
+{
+  if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
+    return;
+
+  /* Free queue */
+  SILC_LOG_DEBUG(("Free thread queue %p", queue));
+  silc_cond_free(queue->cond);
+  silc_mutex_free(queue->lock);
+  silc_dlist_uninit(queue->queue);
+  silc_atomic_uninit32(&queue->connected);
+  silc_free(queue);
+}
+
+/* Push data to queue */
+
+void silc_thread_queue_push(SilcThreadQueue queue, void *data)
+{
+  if (silc_unlikely(!data))
+    return;
+
+  SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue));
+
+  silc_mutex_lock(queue->lock);
+  silc_dlist_start(queue->queue);
+  silc_dlist_insert(queue->queue, data);
+  silc_cond_broadcast(queue->cond);
+  silc_mutex_unlock(queue->lock);
+}
+
+/* Get data or wait if wanted or return NULL. */
+
+void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block)
+{
+  void *data;
+
+  if (block)
+    return silc_thread_queue_timed_pop(queue, 0);
+
+  silc_mutex_lock(queue->lock);
+
+  silc_dlist_start(queue->queue);
+  data = silc_dlist_get(queue->queue);
+  if (data)
+    silc_dlist_del(queue->queue, data);
+
+  SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+
+  silc_mutex_unlock(queue->lock);
+
+  return data;
+}
+
+/* Get data or wait for a while */
+
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+                                 int timeout_msec)
+{
+  void *data;
+
+  silc_mutex_lock(queue->lock);
+
+  silc_dlist_start(queue->queue);
+  while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) {
+    if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec))
+      break;
+    silc_dlist_start(queue->queue);
+  }
+
+  if (data)
+    silc_dlist_del(queue->queue, data);
+
+  SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+
+  silc_mutex_unlock(queue->lock);
+
+  return data;
+}
+
+/* Pop entire queue */
+
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block)
+{
+  SilcDList list;
+
+  silc_mutex_lock(queue->lock);
+
+  if (block)
+    while (silc_dlist_count(queue->queue) == 0)
+      silc_cond_wait(queue->cond, queue->lock);
+
+  list = queue->queue;
+  queue->queue = silc_dlist_init();
+
+  silc_mutex_unlock(queue->lock);
+
+  silc_dlist_start(list);
+
+  return list;
+}
diff --git a/lib/silcutil/silcthreadqueue.h b/lib/silcutil/silcthreadqueue.h
new file mode 100644 (file)
index 0000000..c6355a8
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+
+  silcthreadqueue.h
+
+  Author: Pekka Riikonen <priikone@silcnet.org>
+
+  Copyright (C) 2008 Pekka Riikonen
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; version 2 of the License.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+*/
+
+/****h* silcutil/SILC Thread Queue Interface
+ *
+ * DESCRIPTION
+ *
+ * This interface provides asynchronous thread queues that can be used to
+ * pass messages and data between two or more threads.  Typically a thread
+ * would create the queue, push data into the queue and some other thread
+ * takes the data from the queue or blocks until more data is available
+ * in the queue.
+ *
+ * EXAMPLE
+ *
+ * Thread 1:
+ *
+ * // Create queue and push data into it
+ * SilcThreadQueue queue = silc_thread_queue_alloc();
+ * silc_thread_queue_push(queue, data);
+ *
+ * Thread 2:
+ *
+ * // Connect to the queue
+ * silc_thread_queue_connect(queue);
+ *
+ * // Block here until data is available from the queue
+ * data = silc_thread_queue_pop(queue, TRUE);
+ *
+ ***/
+
+#ifndef SILCTHREADQUEUE_H
+#define SILCTHREADQUEUE_H
+
+/****s* silcutil/SilcThreadQueueAPI/SilcThreadQueue
+ *
+ * NAME
+ *
+ *    typedef struct SilcThreadQueueStruct *SilcThreadQueue;
+ *
+ * DESCRIPTION
+ *
+ *    The thread queue context allocated by silc_thread_queue_alloc and
+ *    given as argument to all silc_thread_queue_* functions.
+ *
+ ***/
+typedef struct SilcThreadQueueStruct *SilcThreadQueue;
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_alloc
+ *
+ * SYNOPSIS
+ *
+ *    SilcThreadQueue silc_thread_queue_alloc(void);
+ *
+ * DESCRIPTION
+ *
+ *    Allocates new thread queue context and returns it.  Returns NULL in
+ *    case of error and sets the silc_errno.  The returned context is
+ *    immediately ready to be used.  For a thread to be able to use the
+ *    queue it must first connect to it by calling silc_thread_queue_connect.
+ *    The thread that creates the queue automatically connects to the queue.
+ *
+ ***/
+SilcThreadQueue silc_thread_queue_alloc(void);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_connect
+ *
+ * SYNOPSIS
+ *
+ *    SilcBool silc_thread_queue_connect(SilcThreadQueue queue);
+ *
+ * DESCRIPTION
+ *
+ *    Connects current thread to the thread queue.  This function must
+ *    be called by each thread wanting to use the thread queue.  After the
+ *    thread is finished using the queue it must disconnect from the queue
+ *    by calling silc_thread_queue_disconnect.
+ *
+ ***/
+void silc_thread_queue_connect(SilcThreadQueue queue);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_disconnect
+ *
+ * SYNOPSIS
+ *
+ *    void silc_thread_queue_disconnect(SilcThreadQueue queue);
+ *
+ * DESCRIPTION
+ *
+ *    Disconnects the current thread from the thread queue.  This must be
+ *    called after the thread has finished using the thread queue.
+ *
+ *    When the last thread has disconnected from the queue the queue is
+ *    destroyed.
+ *
+ ***/
+void silc_thread_queue_disconnect(SilcThreadQueue queue);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_push
+ *
+ * SYNOPSIS
+ *
+ *    void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+ *
+ * DESCRIPTION
+ *
+ *    Pushes the `data' into the thread queue.  The data will become
+ *    immediately available in the queue for other threads.
+ *
+ ***/
+void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_pop
+ *
+ * SYNOPSIS
+ *
+ *    void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+ *
+ * DESCRIPTION
+ *
+ *    Takes data from the queue and returns it.  If `block' is TRUE and
+ *    data is not available this will block until data becomes available.
+ *    If `block' is FALSE and data is not available this will return NULL.
+ *    If `block' is TRUE this will never return NULL.
+ *
+ ***/
+void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_timed_pop
+ *
+ * SYNOPSIS
+ *
+ *    void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+ *                                      int timeout_msec);
+ *
+ * DESCRIPTION
+ *
+ *    Takes data from the thread queue or waits at most `timeout_msec'
+ *    milliseconds for the data to arrive.  If data is not available when
+ *    the timeout occurrs this returns NULL.
+ *
+ ***/
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+                                 int timeout_msec);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_pop_list
+ *
+ * SYNOPSIS
+ *
+ *    SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue,
+ *                                         SilcBool block);
+ *
+ * DESCRIPTION
+ *
+ *    Takes everything from the queue and returns the data in a list.  The
+ *    caller must free the returned list with silc_dlist_uninit.  If the
+ *    `block' is FALSE this will never block but will return the queue
+ *    immediately.  If `block' is TRUE this will block if the queue is
+ *    empty.
+ *
+ ***/
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block);
+
+#endif /* SILCTHREADQUEUE_H */
diff --git a/lib/silcutil/tests/test_silcthreadqueue.c b/lib/silcutil/tests/test_silcthreadqueue.c
new file mode 100644 (file)
index 0000000..daecebf
--- /dev/null
@@ -0,0 +1,120 @@
+/* SilcThreadQueue tests */
+
+#include "silcruntime.h"
+
+SilcSchedule schedule;
+SilcThreadQueue queue;
+SilcBool success = FALSE;
+
+SILC_FSM_STATE(test_st_start);
+SILC_FSM_STATE(test_st_wait);
+SILC_FSM_STATE(test_st_thread_start);
+SILC_FSM_STATE(test_st_finish);
+
+SILC_FSM_STATE(test_st_start)
+{
+  SilcFSMThread thread;
+
+  SILC_LOG_DEBUG(("test_st_start"));
+
+  queue = silc_thread_queue_alloc();
+  if (!queue) {
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  thread = silc_fsm_thread_alloc(fsm, NULL, NULL, NULL, TRUE);
+  if (!thread) {
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  silc_fsm_start(thread, test_st_thread_start);
+  silc_fsm_set_state_context(fsm, thread);
+
+  silc_fsm_next(fsm, test_st_wait);
+  return SILC_FSM_YIELD;
+}
+
+SILC_FSM_STATE(test_st_wait)
+{
+  void *data;
+
+  SILC_LOG_DEBUG(("Wait for data"));
+
+  /* Wait for data */
+  data = silc_thread_queue_pop(queue, TRUE);
+  if (!data || data != (void *)100) {
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  success = TRUE;
+  silc_fsm_next(fsm, test_st_finish);
+  SILC_FSM_THREAD_WAIT(state_context);
+}
+
+SILC_FSM_STATE(test_st_thread_start)
+{
+  silc_thread_queue_connect(queue);
+
+  sleep(1);
+
+  /* Send data */
+  SILC_LOG_DEBUG(("Send data"));
+  silc_thread_queue_push(queue, (void *)100);
+
+  silc_thread_queue_disconnect(queue);
+  return SILC_FSM_FINISH;
+}
+
+SILC_FSM_STATE(test_st_finish)
+{
+  SILC_LOG_DEBUG(("test_st_finish"));
+
+  silc_thread_queue_disconnect(queue);
+
+  SILC_LOG_DEBUG(("Finish machine"));
+  return SILC_FSM_FINISH;
+}
+
+static void destructor(SilcFSM fsm, void *fsm_context,
+                      void *destructor_context)
+{
+  SILC_LOG_DEBUG(("FSM destructor, stopping scheduler"));
+  silc_fsm_free(fsm);
+  silc_schedule_stop(schedule);
+}
+
+int main(int argc, char **argv)
+{
+  SilcFSM fsm;
+
+  if (argc > 1 && !strcmp(argv[1], "-d")) {
+    silc_log_debug(TRUE);
+    silc_log_debug_hexdump(TRUE);
+    silc_log_set_debug_string("*thread*");
+  }
+
+  SILC_LOG_DEBUG(("Allocating scheduler"));
+  schedule = silc_schedule_init(0, NULL, NULL, NULL);
+  if (!schedule)
+    goto err;
+
+  SILC_LOG_DEBUG(("Allocating FSM context"));
+  fsm = silc_fsm_alloc(NULL, destructor, NULL, schedule);
+  if (!fsm)
+    goto err;
+  silc_fsm_start(fsm, test_st_start);
+
+  SILC_LOG_DEBUG(("Running scheduler"));
+  silc_schedule(schedule);
+
+  silc_schedule_uninit(schedule);
+
+ err:
+  SILC_LOG_DEBUG(("Testing was %s", success ? "SUCCESS" : "FAILURE"));
+  fprintf(stderr, "Testing was %s\n", success ? "SUCCESS" : "FAILURE");
+
+  return !success;
+}