+Thu Jan 17 16:40:49 EET 2008 Pekka Riikonen <priikone@silcnet.org>
+
+ * Added SILC Thread Queue API to lib/silcutil/silcthreadqueue.[ch].
+
Tue Jan 15 19:44:36 EET 2008 Pekka Riikonen <priikone@silcnet.org>
* Added SILC Dir API to lib/silcutil/silcdir.h. Implemented it
--- /dev/null
+/*
+
+ silcthreadqueue.c
+
+ Author: Pekka Riikonen <priikone@silcnet.org>
+
+ Copyright (C) 2008 Pekka Riikonen
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+*/
+
+#include "silcruntime.h"
+
+/************************** Types and definitions ***************************/
+
+/* Thread queue context */
+struct SilcThreadQueueStruct {
+ SilcDList queue; /* The queue */
+ SilcMutex lock; /* Queue lock */
+ SilcCond cond; /* Condition for waiting */
+ SilcAtomic32 connected; /* Number of connected threads */
+};
+
+/************************** SILC Thread Queue API ***************************/
+
+/* Allocate thread queue */
+
+SilcThreadQueue silc_thread_queue_alloc(void)
+{
+ SilcThreadQueue queue;
+
+ queue = silc_calloc(1, sizeof(*queue));
+ if (!queue)
+ return NULL;
+
+ SILC_LOG_DEBUG(("Allocated thread queue %p", queue));
+
+ if (!silc_mutex_alloc(&queue->lock)) {
+ silc_free(queue);
+ return NULL;
+ }
+
+ if (!silc_cond_alloc(&queue->cond)) {
+ silc_mutex_free(queue->lock);
+ silc_free(queue);
+ return NULL;
+ }
+
+ queue->queue = silc_dlist_init();
+ if (!queue->queue) {
+ silc_cond_free(queue->cond);
+ silc_mutex_free(queue->lock);
+ silc_free(queue);
+ return NULL;
+ }
+
+ silc_atomic_init32(&queue->connected, 1);
+
+ return queue;
+}
+
+/* Connect current thread to queue */
+
+void silc_thread_queue_connect(SilcThreadQueue queue)
+{
+ silc_atomic_add_int32(&queue->connected, 1);
+}
+
+/* Disconnect current thread from queue */
+
+void silc_thread_queue_disconnect(SilcThreadQueue queue)
+{
+ if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
+ return;
+
+ /* Free queue */
+ SILC_LOG_DEBUG(("Free thread queue %p", queue));
+ silc_cond_free(queue->cond);
+ silc_mutex_free(queue->lock);
+ silc_dlist_uninit(queue->queue);
+ silc_atomic_uninit32(&queue->connected);
+ silc_free(queue);
+}
+
+/* Push data to queue */
+
+void silc_thread_queue_push(SilcThreadQueue queue, void *data)
+{
+ if (silc_unlikely(!data))
+ return;
+
+ SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue));
+
+ silc_mutex_lock(queue->lock);
+ silc_dlist_start(queue->queue);
+ silc_dlist_insert(queue->queue, data);
+ silc_cond_broadcast(queue->cond);
+ silc_mutex_unlock(queue->lock);
+}
+
+/* Get data or wait if wanted or return NULL. */
+
+void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block)
+{
+ void *data;
+
+ if (block)
+ return silc_thread_queue_timed_pop(queue, 0);
+
+ silc_mutex_lock(queue->lock);
+
+ silc_dlist_start(queue->queue);
+ data = silc_dlist_get(queue->queue);
+ if (data)
+ silc_dlist_del(queue->queue, data);
+
+ SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+
+ silc_mutex_unlock(queue->lock);
+
+ return data;
+}
+
+/* Get data or wait for a while */
+
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+ int timeout_msec)
+{
+ void *data;
+
+ silc_mutex_lock(queue->lock);
+
+ silc_dlist_start(queue->queue);
+ while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) {
+ if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec))
+ break;
+ silc_dlist_start(queue->queue);
+ }
+
+ if (data)
+ silc_dlist_del(queue->queue, data);
+
+ SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+
+ silc_mutex_unlock(queue->lock);
+
+ return data;
+}
+
+/* Pop entire queue */
+
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block)
+{
+ SilcDList list;
+
+ silc_mutex_lock(queue->lock);
+
+ if (block)
+ while (silc_dlist_count(queue->queue) == 0)
+ silc_cond_wait(queue->cond, queue->lock);
+
+ list = queue->queue;
+ queue->queue = silc_dlist_init();
+
+ silc_mutex_unlock(queue->lock);
+
+ silc_dlist_start(list);
+
+ return list;
+}
--- /dev/null
+/*
+
+ silcthreadqueue.h
+
+ Author: Pekka Riikonen <priikone@silcnet.org>
+
+ Copyright (C) 2008 Pekka Riikonen
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+*/
+
+/****h* silcutil/SILC Thread Queue Interface
+ *
+ * DESCRIPTION
+ *
+ * This interface provides asynchronous thread queues that can be used to
+ * pass messages and data between two or more threads. Typically a thread
+ * would create the queue, push data into the queue and some other thread
+ * takes the data from the queue or blocks until more data is available
+ * in the queue.
+ *
+ * EXAMPLE
+ *
+ * Thread 1:
+ *
+ * // Create queue and push data into it
+ * SilcThreadQueue queue = silc_thread_queue_alloc();
+ * silc_thread_queue_push(queue, data);
+ *
+ * Thread 2:
+ *
+ * // Connect to the queue
+ * silc_thread_queue_connect(queue);
+ *
+ * // Block here until data is available from the queue
+ * data = silc_thread_queue_pop(queue, TRUE);
+ *
+ ***/
+
+#ifndef SILCTHREADQUEUE_H
+#define SILCTHREADQUEUE_H
+
+/****s* silcutil/SilcThreadQueueAPI/SilcThreadQueue
+ *
+ * NAME
+ *
+ * typedef struct SilcThreadQueueStruct *SilcThreadQueue;
+ *
+ * DESCRIPTION
+ *
+ * The thread queue context allocated by silc_thread_queue_alloc and
+ * given as argument to all silc_thread_queue_* functions.
+ *
+ ***/
+typedef struct SilcThreadQueueStruct *SilcThreadQueue;
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_alloc
+ *
+ * SYNOPSIS
+ *
+ * SilcThreadQueue silc_thread_queue_alloc(void);
+ *
+ * DESCRIPTION
+ *
+ * Allocates new thread queue context and returns it. Returns NULL in
+ * case of error and sets the silc_errno. The returned context is
+ * immediately ready to be used. For a thread to be able to use the
+ * queue it must first connect to it by calling silc_thread_queue_connect.
+ * The thread that creates the queue automatically connects to the queue.
+ *
+ ***/
+SilcThreadQueue silc_thread_queue_alloc(void);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_connect
+ *
+ * SYNOPSIS
+ *
+ * SilcBool silc_thread_queue_connect(SilcThreadQueue queue);
+ *
+ * DESCRIPTION
+ *
+ * Connects current thread to the thread queue. This function must
+ * be called by each thread wanting to use the thread queue. After the
+ * thread is finished using the queue it must disconnect from the queue
+ * by calling silc_thread_queue_disconnect.
+ *
+ ***/
+void silc_thread_queue_connect(SilcThreadQueue queue);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_disconnect
+ *
+ * SYNOPSIS
+ *
+ * void silc_thread_queue_disconnect(SilcThreadQueue queue);
+ *
+ * DESCRIPTION
+ *
+ * Disconnects the current thread from the thread queue. This must be
+ * called after the thread has finished using the thread queue.
+ *
+ * When the last thread has disconnected from the queue the queue is
+ * destroyed.
+ *
+ ***/
+void silc_thread_queue_disconnect(SilcThreadQueue queue);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_push
+ *
+ * SYNOPSIS
+ *
+ * void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+ *
+ * DESCRIPTION
+ *
+ * Pushes the `data' into the thread queue. The data will become
+ * immediately available in the queue for other threads.
+ *
+ ***/
+void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_pop
+ *
+ * SYNOPSIS
+ *
+ * void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+ *
+ * DESCRIPTION
+ *
+ * Takes data from the queue and returns it. If `block' is TRUE and
+ * data is not available this will block until data becomes available.
+ * If `block' is FALSE and data is not available this will return NULL.
+ * If `block' is TRUE this will never return NULL.
+ *
+ ***/
+void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_timed_pop
+ *
+ * SYNOPSIS
+ *
+ * void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+ * int timeout_msec);
+ *
+ * DESCRIPTION
+ *
+ * Takes data from the thread queue or waits at most `timeout_msec'
+ * milliseconds for the data to arrive. If data is not available when
+ * the timeout occurrs this returns NULL.
+ *
+ ***/
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+ int timeout_msec);
+
+/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_pop_list
+ *
+ * SYNOPSIS
+ *
+ * SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue,
+ * SilcBool block);
+ *
+ * DESCRIPTION
+ *
+ * Takes everything from the queue and returns the data in a list. The
+ * caller must free the returned list with silc_dlist_uninit. If the
+ * `block' is FALSE this will never block but will return the queue
+ * immediately. If `block' is TRUE this will block if the queue is
+ * empty.
+ *
+ ***/
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block);
+
+#endif /* SILCTHREADQUEUE_H */
--- /dev/null
+/* SilcThreadQueue tests */
+
+#include "silcruntime.h"
+
+SilcSchedule schedule;
+SilcThreadQueue queue;
+SilcBool success = FALSE;
+
+SILC_FSM_STATE(test_st_start);
+SILC_FSM_STATE(test_st_wait);
+SILC_FSM_STATE(test_st_thread_start);
+SILC_FSM_STATE(test_st_finish);
+
+SILC_FSM_STATE(test_st_start)
+{
+ SilcFSMThread thread;
+
+ SILC_LOG_DEBUG(("test_st_start"));
+
+ queue = silc_thread_queue_alloc();
+ if (!queue) {
+ silc_fsm_next(fsm, test_st_finish);
+ return SILC_FSM_CONTINUE;
+ }
+
+ thread = silc_fsm_thread_alloc(fsm, NULL, NULL, NULL, TRUE);
+ if (!thread) {
+ silc_fsm_next(fsm, test_st_finish);
+ return SILC_FSM_CONTINUE;
+ }
+
+ silc_fsm_start(thread, test_st_thread_start);
+ silc_fsm_set_state_context(fsm, thread);
+
+ silc_fsm_next(fsm, test_st_wait);
+ return SILC_FSM_YIELD;
+}
+
+SILC_FSM_STATE(test_st_wait)
+{
+ void *data;
+
+ SILC_LOG_DEBUG(("Wait for data"));
+
+ /* Wait for data */
+ data = silc_thread_queue_pop(queue, TRUE);
+ if (!data || data != (void *)100) {
+ silc_fsm_next(fsm, test_st_finish);
+ return SILC_FSM_CONTINUE;
+ }
+
+ success = TRUE;
+ silc_fsm_next(fsm, test_st_finish);
+ SILC_FSM_THREAD_WAIT(state_context);
+}
+
+SILC_FSM_STATE(test_st_thread_start)
+{
+ silc_thread_queue_connect(queue);
+
+ sleep(1);
+
+ /* Send data */
+ SILC_LOG_DEBUG(("Send data"));
+ silc_thread_queue_push(queue, (void *)100);
+
+ silc_thread_queue_disconnect(queue);
+ return SILC_FSM_FINISH;
+}
+
+SILC_FSM_STATE(test_st_finish)
+{
+ SILC_LOG_DEBUG(("test_st_finish"));
+
+ silc_thread_queue_disconnect(queue);
+
+ SILC_LOG_DEBUG(("Finish machine"));
+ return SILC_FSM_FINISH;
+}
+
+static void destructor(SilcFSM fsm, void *fsm_context,
+ void *destructor_context)
+{
+ SILC_LOG_DEBUG(("FSM destructor, stopping scheduler"));
+ silc_fsm_free(fsm);
+ silc_schedule_stop(schedule);
+}
+
+int main(int argc, char **argv)
+{
+ SilcFSM fsm;
+
+ if (argc > 1 && !strcmp(argv[1], "-d")) {
+ silc_log_debug(TRUE);
+ silc_log_debug_hexdump(TRUE);
+ silc_log_set_debug_string("*thread*");
+ }
+
+ SILC_LOG_DEBUG(("Allocating scheduler"));
+ schedule = silc_schedule_init(0, NULL, NULL, NULL);
+ if (!schedule)
+ goto err;
+
+ SILC_LOG_DEBUG(("Allocating FSM context"));
+ fsm = silc_fsm_alloc(NULL, destructor, NULL, schedule);
+ if (!fsm)
+ goto err;
+ silc_fsm_start(fsm, test_st_start);
+
+ SILC_LOG_DEBUG(("Running scheduler"));
+ silc_schedule(schedule);
+
+ silc_schedule_uninit(schedule);
+
+ err:
+ SILC_LOG_DEBUG(("Testing was %s", success ? "SUCCESS" : "FAILURE"));
+ fprintf(stderr, "Testing was %s\n", success ? "SUCCESS" : "FAILURE");
+
+ return !success;
+}