From e7b6c157b80152bf9fb9266e6bdd93f9fb0db776 Mon Sep 17 00:00:00 2001 From: Pekka Riikonen Date: Fri, 18 Jan 2008 14:46:32 +0000 Subject: [PATCH] Added SILC Thread Queue API --- CHANGES.RUNTIME | 4 + lib/silcutil/silcthreadqueue.c | 178 +++++++++++++++++++++ lib/silcutil/silcthreadqueue.h | 180 ++++++++++++++++++++++ lib/silcutil/tests/test_silcthreadqueue.c | 120 +++++++++++++++ 4 files changed, 482 insertions(+) create mode 100644 lib/silcutil/silcthreadqueue.c create mode 100644 lib/silcutil/silcthreadqueue.h create mode 100644 lib/silcutil/tests/test_silcthreadqueue.c diff --git a/CHANGES.RUNTIME b/CHANGES.RUNTIME index e9168998..d62f668f 100644 --- a/CHANGES.RUNTIME +++ b/CHANGES.RUNTIME @@ -1,3 +1,7 @@ +Thu Jan 17 16:40:49 EET 2008 Pekka Riikonen + + * Added SILC Thread Queue API to lib/silcutil/silcthreadqueue.[ch]. + Tue Jan 15 19:44:36 EET 2008 Pekka Riikonen * Added SILC Dir API to lib/silcutil/silcdir.h. Implemented it diff --git a/lib/silcutil/silcthreadqueue.c b/lib/silcutil/silcthreadqueue.c new file mode 100644 index 00000000..1ca9ef3b --- /dev/null +++ b/lib/silcutil/silcthreadqueue.c @@ -0,0 +1,178 @@ +/* + + silcthreadqueue.c + + Author: Pekka Riikonen + + Copyright (C) 2008 Pekka Riikonen + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + +*/ + +#include "silcruntime.h" + +/************************** Types and definitions ***************************/ + +/* Thread queue context */ +struct SilcThreadQueueStruct { + SilcDList queue; /* The queue */ + SilcMutex lock; /* Queue lock */ + SilcCond cond; /* Condition for waiting */ + SilcAtomic32 connected; /* Number of connected threads */ +}; + +/************************** SILC Thread Queue API ***************************/ + +/* Allocate thread queue */ + +SilcThreadQueue silc_thread_queue_alloc(void) +{ + SilcThreadQueue queue; + + queue = silc_calloc(1, sizeof(*queue)); + if (!queue) + return NULL; + + SILC_LOG_DEBUG(("Allocated thread queue %p", queue)); + + if (!silc_mutex_alloc(&queue->lock)) { + silc_free(queue); + return NULL; + } + + if (!silc_cond_alloc(&queue->cond)) { + silc_mutex_free(queue->lock); + silc_free(queue); + return NULL; + } + + queue->queue = silc_dlist_init(); + if (!queue->queue) { + silc_cond_free(queue->cond); + silc_mutex_free(queue->lock); + silc_free(queue); + return NULL; + } + + silc_atomic_init32(&queue->connected, 1); + + return queue; +} + +/* Connect current thread to queue */ + +void silc_thread_queue_connect(SilcThreadQueue queue) +{ + silc_atomic_add_int32(&queue->connected, 1); +} + +/* Disconnect current thread from queue */ + +void silc_thread_queue_disconnect(SilcThreadQueue queue) +{ + if (silc_atomic_sub_int32(&queue->connected, 1) > 0) + return; + + /* Free queue */ + SILC_LOG_DEBUG(("Free thread queue %p", queue)); + silc_cond_free(queue->cond); + silc_mutex_free(queue->lock); + silc_dlist_uninit(queue->queue); + silc_atomic_uninit32(&queue->connected); + silc_free(queue); +} + +/* Push data to queue */ + +void silc_thread_queue_push(SilcThreadQueue queue, void *data) +{ + if (silc_unlikely(!data)) + return; + + SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue)); + + silc_mutex_lock(queue->lock); + silc_dlist_start(queue->queue); + silc_dlist_insert(queue->queue, data); + silc_cond_broadcast(queue->cond); + silc_mutex_unlock(queue->lock); +} + +/* Get data or wait if wanted or return NULL. */ + +void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block) +{ + void *data; + + if (block) + return silc_thread_queue_timed_pop(queue, 0); + + silc_mutex_lock(queue->lock); + + silc_dlist_start(queue->queue); + data = silc_dlist_get(queue->queue); + if (data) + silc_dlist_del(queue->queue, data); + + SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue)); + + silc_mutex_unlock(queue->lock); + + return data; +} + +/* Get data or wait for a while */ + +void *silc_thread_queue_timed_pop(SilcThreadQueue queue, + int timeout_msec) +{ + void *data; + + silc_mutex_lock(queue->lock); + + silc_dlist_start(queue->queue); + while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) { + if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec)) + break; + silc_dlist_start(queue->queue); + } + + if (data) + silc_dlist_del(queue->queue, data); + + SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue)); + + silc_mutex_unlock(queue->lock); + + return data; +} + +/* Pop entire queue */ + +SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block) +{ + SilcDList list; + + silc_mutex_lock(queue->lock); + + if (block) + while (silc_dlist_count(queue->queue) == 0) + silc_cond_wait(queue->cond, queue->lock); + + list = queue->queue; + queue->queue = silc_dlist_init(); + + silc_mutex_unlock(queue->lock); + + silc_dlist_start(list); + + return list; +} diff --git a/lib/silcutil/silcthreadqueue.h b/lib/silcutil/silcthreadqueue.h new file mode 100644 index 00000000..c6355a89 --- /dev/null +++ b/lib/silcutil/silcthreadqueue.h @@ -0,0 +1,180 @@ +/* + + silcthreadqueue.h + + Author: Pekka Riikonen + + Copyright (C) 2008 Pekka Riikonen + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + +*/ + +/****h* silcutil/SILC Thread Queue Interface + * + * DESCRIPTION + * + * This interface provides asynchronous thread queues that can be used to + * pass messages and data between two or more threads. Typically a thread + * would create the queue, push data into the queue and some other thread + * takes the data from the queue or blocks until more data is available + * in the queue. + * + * EXAMPLE + * + * Thread 1: + * + * // Create queue and push data into it + * SilcThreadQueue queue = silc_thread_queue_alloc(); + * silc_thread_queue_push(queue, data); + * + * Thread 2: + * + * // Connect to the queue + * silc_thread_queue_connect(queue); + * + * // Block here until data is available from the queue + * data = silc_thread_queue_pop(queue, TRUE); + * + ***/ + +#ifndef SILCTHREADQUEUE_H +#define SILCTHREADQUEUE_H + +/****s* silcutil/SilcThreadQueueAPI/SilcThreadQueue + * + * NAME + * + * typedef struct SilcThreadQueueStruct *SilcThreadQueue; + * + * DESCRIPTION + * + * The thread queue context allocated by silc_thread_queue_alloc and + * given as argument to all silc_thread_queue_* functions. + * + ***/ +typedef struct SilcThreadQueueStruct *SilcThreadQueue; + +/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_alloc + * + * SYNOPSIS + * + * SilcThreadQueue silc_thread_queue_alloc(void); + * + * DESCRIPTION + * + * Allocates new thread queue context and returns it. Returns NULL in + * case of error and sets the silc_errno. The returned context is + * immediately ready to be used. For a thread to be able to use the + * queue it must first connect to it by calling silc_thread_queue_connect. + * The thread that creates the queue automatically connects to the queue. + * + ***/ +SilcThreadQueue silc_thread_queue_alloc(void); + +/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_connect + * + * SYNOPSIS + * + * SilcBool silc_thread_queue_connect(SilcThreadQueue queue); + * + * DESCRIPTION + * + * Connects current thread to the thread queue. This function must + * be called by each thread wanting to use the thread queue. After the + * thread is finished using the queue it must disconnect from the queue + * by calling silc_thread_queue_disconnect. + * + ***/ +void silc_thread_queue_connect(SilcThreadQueue queue); + +/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_disconnect + * + * SYNOPSIS + * + * void silc_thread_queue_disconnect(SilcThreadQueue queue); + * + * DESCRIPTION + * + * Disconnects the current thread from the thread queue. This must be + * called after the thread has finished using the thread queue. + * + * When the last thread has disconnected from the queue the queue is + * destroyed. + * + ***/ +void silc_thread_queue_disconnect(SilcThreadQueue queue); + +/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_push + * + * SYNOPSIS + * + * void silc_thread_queue_push(SilcThreadQueue queue, void *data); + * + * DESCRIPTION + * + * Pushes the `data' into the thread queue. The data will become + * immediately available in the queue for other threads. + * + ***/ +void silc_thread_queue_push(SilcThreadQueue queue, void *data); + +/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_pop + * + * SYNOPSIS + * + * void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block); + * + * DESCRIPTION + * + * Takes data from the queue and returns it. If `block' is TRUE and + * data is not available this will block until data becomes available. + * If `block' is FALSE and data is not available this will return NULL. + * If `block' is TRUE this will never return NULL. + * + ***/ +void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block); + +/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_timed_pop + * + * SYNOPSIS + * + * void *silc_thread_queue_timed_pop(SilcThreadQueue queue, + * int timeout_msec); + * + * DESCRIPTION + * + * Takes data from the thread queue or waits at most `timeout_msec' + * milliseconds for the data to arrive. If data is not available when + * the timeout occurrs this returns NULL. + * + ***/ +void *silc_thread_queue_timed_pop(SilcThreadQueue queue, + int timeout_msec); + +/****f* silcutil/SilcThreadQueueAPI/silc_thread_queue_pop_list + * + * SYNOPSIS + * + * SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, + * SilcBool block); + * + * DESCRIPTION + * + * Takes everything from the queue and returns the data in a list. The + * caller must free the returned list with silc_dlist_uninit. If the + * `block' is FALSE this will never block but will return the queue + * immediately. If `block' is TRUE this will block if the queue is + * empty. + * + ***/ +SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block); + +#endif /* SILCTHREADQUEUE_H */ diff --git a/lib/silcutil/tests/test_silcthreadqueue.c b/lib/silcutil/tests/test_silcthreadqueue.c new file mode 100644 index 00000000..daecebff --- /dev/null +++ b/lib/silcutil/tests/test_silcthreadqueue.c @@ -0,0 +1,120 @@ +/* SilcThreadQueue tests */ + +#include "silcruntime.h" + +SilcSchedule schedule; +SilcThreadQueue queue; +SilcBool success = FALSE; + +SILC_FSM_STATE(test_st_start); +SILC_FSM_STATE(test_st_wait); +SILC_FSM_STATE(test_st_thread_start); +SILC_FSM_STATE(test_st_finish); + +SILC_FSM_STATE(test_st_start) +{ + SilcFSMThread thread; + + SILC_LOG_DEBUG(("test_st_start")); + + queue = silc_thread_queue_alloc(); + if (!queue) { + silc_fsm_next(fsm, test_st_finish); + return SILC_FSM_CONTINUE; + } + + thread = silc_fsm_thread_alloc(fsm, NULL, NULL, NULL, TRUE); + if (!thread) { + silc_fsm_next(fsm, test_st_finish); + return SILC_FSM_CONTINUE; + } + + silc_fsm_start(thread, test_st_thread_start); + silc_fsm_set_state_context(fsm, thread); + + silc_fsm_next(fsm, test_st_wait); + return SILC_FSM_YIELD; +} + +SILC_FSM_STATE(test_st_wait) +{ + void *data; + + SILC_LOG_DEBUG(("Wait for data")); + + /* Wait for data */ + data = silc_thread_queue_pop(queue, TRUE); + if (!data || data != (void *)100) { + silc_fsm_next(fsm, test_st_finish); + return SILC_FSM_CONTINUE; + } + + success = TRUE; + silc_fsm_next(fsm, test_st_finish); + SILC_FSM_THREAD_WAIT(state_context); +} + +SILC_FSM_STATE(test_st_thread_start) +{ + silc_thread_queue_connect(queue); + + sleep(1); + + /* Send data */ + SILC_LOG_DEBUG(("Send data")); + silc_thread_queue_push(queue, (void *)100); + + silc_thread_queue_disconnect(queue); + return SILC_FSM_FINISH; +} + +SILC_FSM_STATE(test_st_finish) +{ + SILC_LOG_DEBUG(("test_st_finish")); + + silc_thread_queue_disconnect(queue); + + SILC_LOG_DEBUG(("Finish machine")); + return SILC_FSM_FINISH; +} + +static void destructor(SilcFSM fsm, void *fsm_context, + void *destructor_context) +{ + SILC_LOG_DEBUG(("FSM destructor, stopping scheduler")); + silc_fsm_free(fsm); + silc_schedule_stop(schedule); +} + +int main(int argc, char **argv) +{ + SilcFSM fsm; + + if (argc > 1 && !strcmp(argv[1], "-d")) { + silc_log_debug(TRUE); + silc_log_debug_hexdump(TRUE); + silc_log_set_debug_string("*thread*"); + } + + SILC_LOG_DEBUG(("Allocating scheduler")); + schedule = silc_schedule_init(0, NULL, NULL, NULL); + if (!schedule) + goto err; + + SILC_LOG_DEBUG(("Allocating FSM context")); + fsm = silc_fsm_alloc(NULL, destructor, NULL, schedule); + if (!fsm) + goto err; + silc_fsm_start(fsm, test_st_start); + + SILC_LOG_DEBUG(("Running scheduler")); + silc_schedule(schedule); + + silc_schedule_uninit(schedule); + + err: + SILC_LOG_DEBUG(("Testing was %s", success ? "SUCCESS" : "FAILURE")); + fprintf(stderr, "Testing was %s\n", success ? "SUCCESS" : "FAILURE"); + + return !success; +} -- 2.24.0