From 76c779783817ba7cdbe67ca83971134ce70e662f Mon Sep 17 00:00:00 2001 From: Pekka Riikonen Date: Sat, 23 Feb 2008 15:46:52 +0200 Subject: [PATCH] Major rewrite of the SilcThreadQueue API The new API allows creation of queues with multiple pipes. Each pipe is independently lockable and waitable. A queue with one pipe is same as the old SilcThreadQueue. The queue now also supports FIFO order when popping data from the pipes. By default the order is LIFO. --- TODO | 17 +- lib/silcutil/silcthreadqueue.c | 220 ++++++++++++++++------ lib/silcutil/silcthreadqueue.h | 75 ++++++-- lib/silcutil/tests/test_silcthreadqueue.c | 10 +- 4 files changed, 237 insertions(+), 85 deletions(-) diff --git a/TODO b/TODO index 7f116c95..e907469a 100644 --- a/TODO +++ b/TODO @@ -164,10 +164,21 @@ Runtime library, lib/silcutil/ rwlock implementation using atomic operations.) not for now. -lib/silcutil/symbian/ -===================== +Windows Support +=============== + + +Symbian OS Support +================== o Something needs to be thought to the logging globals as well, like silc_debug etc. They won't work on EPOC. Perhaps logging and debugging is to be disabled on EPOC. The logging currently works - by it cannot be controlled, same with debugging. + by it cannot be controlled, same with debugging. SILC Global API + MUST be used with all globals on Symbian. + + o gethostname() returns "Function not implemented". Others may return + the same. We should fix that probably to use RHostResolver and + GetHostName(). + + o silc_thread_exit should call User::Exit(). diff --git a/lib/silcutil/silcthreadqueue.c b/lib/silcutil/silcthreadqueue.c index 1ca9ef3b..324063fc 100644 --- a/lib/silcutil/silcthreadqueue.c +++ b/lib/silcutil/silcthreadqueue.c @@ -21,45 +21,62 @@ /************************** Types and definitions ***************************/ -/* Thread queue context */ -struct SilcThreadQueueStruct { - SilcDList queue; /* The queue */ +/* Queue data context */ +typedef struct SilcThreadQueueDataStruct { + struct SilcThreadQueueDataStruct *next; + void *data; /* User data */ +} *SilcThreadQueueData; + +/* Pipe */ +typedef struct SilcThreadQueuePipeStruct { + SilcList queue; /* The queue */ + SilcList freelist; /* Free list of queue data contexts */ SilcMutex lock; /* Queue lock */ SilcCond cond; /* Condition for waiting */ +} *SilcThreadQueuePipe; + +/* Thread queue context */ +struct SilcThreadQueueStruct { + SilcThreadQueuePipe pipes; /* Queue pipes */ SilcAtomic32 connected; /* Number of connected threads */ + unsigned int num_pipes : 31; /* Number of pipes */ + unsigned int fifo : 1; /* FIFO */ }; /************************** SILC Thread Queue API ***************************/ /* Allocate thread queue */ -SilcThreadQueue silc_thread_queue_alloc(void) +SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo) { SilcThreadQueue queue; + SilcUInt32 i; + + if (!num_pipes) + num_pipes = 1; queue = silc_calloc(1, sizeof(*queue)); if (!queue) return NULL; - SILC_LOG_DEBUG(("Allocated thread queue %p", queue)); - - if (!silc_mutex_alloc(&queue->lock)) { - silc_free(queue); - return NULL; - } + SILC_LOG_DEBUG(("Allocated thread queue %p, %d pipes %s", queue, + num_pipes, fifo ? "FIFO" : "")); - if (!silc_cond_alloc(&queue->cond)) { - silc_mutex_free(queue->lock); + queue->pipes = silc_calloc(num_pipes, sizeof(*queue->pipes)); + if (!queue->pipes) { silc_free(queue); return NULL; } - - queue->queue = silc_dlist_init(); - if (!queue->queue) { - silc_cond_free(queue->cond); - silc_mutex_free(queue->lock); - silc_free(queue); - return NULL; + queue->num_pipes = num_pipes; + queue->fifo = fifo; + + for (i = 0; i < num_pipes; i++) { + silc_list_init(queue->pipes[i].queue, + struct SilcThreadQueueDataStruct, next); + silc_list_init(queue->pipes[i].freelist, + struct SilcThreadQueueDataStruct, next); + silc_mutex_alloc(&queue->pipes[i].lock); + silc_cond_alloc(&queue->pipes[i].cond); } silc_atomic_init32(&queue->connected, 1); @@ -71,106 +88,185 @@ SilcThreadQueue silc_thread_queue_alloc(void) void silc_thread_queue_connect(SilcThreadQueue queue) { + SILC_LOG_DEBUG(("Connect to thread queue %p", queue)); silc_atomic_add_int32(&queue->connected, 1); } /* Disconnect current thread from queue */ -void silc_thread_queue_disconnect(SilcThreadQueue queue) +SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue) { + SilcUInt32 i; + SilcThreadQueueData data; + + SILC_LOG_DEBUG(("Disconnect from thread queue %p", queue)); + if (silc_atomic_sub_int32(&queue->connected, 1) > 0) - return; + return TRUE; /* Free queue */ SILC_LOG_DEBUG(("Free thread queue %p", queue)); - silc_cond_free(queue->cond); - silc_mutex_free(queue->lock); - silc_dlist_uninit(queue->queue); + + for (i = 0; i < queue->num_pipes; i++) { + silc_cond_free(queue->pipes[i].cond); + silc_mutex_free(queue->pipes[i].lock); + silc_list_start(queue->pipes[i].queue); + while ((data = silc_list_get(queue->pipes[i].queue))) + silc_free(data); + silc_list_start(queue->pipes[i].freelist); + while ((data = silc_list_get(queue->pipes[i].freelist))) + silc_free(data); + } + + silc_free(queue->pipes); silc_atomic_uninit32(&queue->connected); silc_free(queue); + + return FALSE; } /* Push data to queue */ -void silc_thread_queue_push(SilcThreadQueue queue, void *data) +void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data, + SilcBool demux) { + SilcThreadQueueData d; + SilcUInt32 i; + if (silc_unlikely(!data)) return; - SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue)); + SILC_ASSERT(pipe_index < queue->num_pipes); + + SILC_LOG_DEBUG(("Push data %p to thread queue %p, pipe %d, demux %s", + data, queue, pipe_index, demux ? "yes" : "no")); + + silc_mutex_lock(queue->pipes[pipe_index].lock); + + d = silc_list_pop(queue->pipes[pipe_index].freelist); + if (!d) { + d = silc_calloc(1, sizeof(*d)); + if (!d) + return; + } + d->data = data; + + if (demux) { + for (i = 0; i < queue->num_pipes; i++) { + if (queue->fifo) + silc_list_add(queue->pipes[i].queue, d); + else + silc_list_insert(queue->pipes[i].queue, NULL, d); + } + } else { + if (queue->fifo) + silc_list_add(queue->pipes[pipe_index].queue, d); + else + silc_list_insert(queue->pipes[pipe_index].queue, NULL, d); + } - silc_mutex_lock(queue->lock); - silc_dlist_start(queue->queue); - silc_dlist_insert(queue->queue, data); - silc_cond_broadcast(queue->cond); - silc_mutex_unlock(queue->lock); + silc_cond_broadcast(queue->pipes[pipe_index].cond); + silc_mutex_unlock(queue->pipes[pipe_index].lock); } /* Get data or wait if wanted or return NULL. */ -void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block) +void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index, + SilcBool block) { + SilcThreadQueueData d; void *data; - if (block) - return silc_thread_queue_timed_pop(queue, 0); - - silc_mutex_lock(queue->lock); - - silc_dlist_start(queue->queue); - data = silc_dlist_get(queue->queue); - if (data) - silc_dlist_del(queue->queue, data); + SILC_ASSERT(pipe_index < queue->num_pipes); + + silc_mutex_lock(queue->pipes[pipe_index].lock); + + if (block) { + /* Block */ + while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL) + silc_cond_wait(queue->pipes[pipe_index].cond, + queue->pipes[pipe_index].lock); + silc_list_add(queue->pipes[pipe_index].freelist, d); + data = d->data; + } else { + /* No blocking */ + d = silc_list_pop(queue->pipes[pipe_index].queue); + data = NULL; + if (d) { + silc_list_add(queue->pipes[pipe_index].freelist, d); + data = d->data; + } + } - SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue)); + SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, + queue, pipe_index)); - silc_mutex_unlock(queue->lock); + silc_mutex_unlock(queue->pipes[pipe_index].lock); return data; } /* Get data or wait for a while */ -void *silc_thread_queue_timed_pop(SilcThreadQueue queue, +void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index, int timeout_msec) { - void *data; + SilcThreadQueueData d; + void *data = NULL; + + SILC_ASSERT(pipe_index < queue->num_pipes); - silc_mutex_lock(queue->lock); + silc_mutex_lock(queue->pipes[pipe_index].lock); - silc_dlist_start(queue->queue); - while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) { - if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec)) + while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL) + if (!silc_cond_timedwait(queue->pipes[pipe_index].cond, + queue->pipes[pipe_index].lock, timeout_msec)) break; - silc_dlist_start(queue->queue); - } - if (data) - silc_dlist_del(queue->queue, data); + if (d) { + silc_list_add(queue->pipes[pipe_index].freelist, d); + data = d->data; + } - SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue)); + SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, queue, + pipe_index)); - silc_mutex_unlock(queue->lock); + silc_mutex_unlock(queue->pipes[pipe_index].lock); return data; } /* Pop entire queue */ -SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block) +SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index, + SilcBool block) { + SilcThreadQueueData d; SilcDList list; - silc_mutex_lock(queue->lock); + SILC_ASSERT(pipe_index < queue->num_pipes); + + silc_mutex_lock(queue->pipes[pipe_index].lock); if (block) - while (silc_dlist_count(queue->queue) == 0) - silc_cond_wait(queue->cond, queue->lock); + while (silc_list_count(queue->pipes[pipe_index].queue) == 0) + silc_cond_wait(queue->pipes[pipe_index].cond, + queue->pipes[pipe_index].lock); + + list = silc_dlist_init(); + if (!list) + return NULL; + + silc_list_start(queue->pipes[pipe_index].queue); + while ((d = silc_list_get(queue->pipes[pipe_index].queue))) { + silc_dlist_add(list, d->data); + silc_list_add(queue->pipes[pipe_index].freelist, d); + } - list = queue->queue; - queue->queue = silc_dlist_init(); + silc_list_init(queue->pipes[pipe_index].queue, + struct SilcThreadQueueDataStruct, next); - silc_mutex_unlock(queue->lock); + silc_mutex_unlock(queue->pipes[pipe_index].lock); silc_dlist_start(list); diff --git a/lib/silcutil/silcthreadqueue.h b/lib/silcutil/silcthreadqueue.h index 87cf7445..baf0e980 100644 --- a/lib/silcutil/silcthreadqueue.h +++ b/lib/silcutil/silcthreadqueue.h @@ -27,13 +27,17 @@ * takes the data from the queue or blocks until more data is available * in the queue. * + * The queue itself can have one ore more pipes, allowing user to use one + * queue to pass different information in different pipes, if each pipe + * need to be dedicated to specific type of data. + * * EXAMPLE * * Thread 1: * * // Create queue and push data into it - * SilcThreadQueue queue = silc_thread_queue_alloc(); - * silc_thread_queue_push(queue, data); + * SilcThreadQueue queue = silc_thread_queue_alloc(1, FALSE); + * silc_thread_queue_push(queue, 0, data, FALSE); * * Thread 2: * @@ -41,7 +45,7 @@ * silc_thread_queue_connect(queue); * * // Block here until data is available from the queue - * data = silc_thread_queue_pop(queue, TRUE); + * data = silc_thread_queue_pop(queue, 0, TRUE); * ***/ @@ -66,7 +70,7 @@ typedef struct SilcThreadQueueStruct *SilcThreadQueue; * * SYNOPSIS * - * SilcThreadQueue silc_thread_queue_alloc(void); + * SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo); * * DESCRIPTION * @@ -76,8 +80,20 @@ typedef struct SilcThreadQueueStruct *SilcThreadQueue; * queue it must first connect to it by calling silc_thread_queue_connect. * The thread that creates the queue automatically connects to the queue. * + * The 'num_pipes' specifies the number of pipes that exist in the queue. + * If `num_pipes' is 0, the 0 is ignored and one pipe is created anyway. + * By default, caller should create one pipe, unless more are needed. + * The pipes are referenced by index. First pipe has index 0, second + * index 1, and so on. The index is given as argument when pushing + * and popping from the queue. + * + * By default data popped from the queue is done in last-in-first-out + * order; the most recently added data is popped first. If `fifo' is + * set to TRUE the order is first-in-first-out; the first added data is + * popped first. + * ***/ -SilcThreadQueue silc_thread_queue_alloc(void); +SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo); /****f* silcutil/silc_thread_queue_connect * @@ -99,7 +115,7 @@ void silc_thread_queue_connect(SilcThreadQueue queue); * * SYNOPSIS * - * void silc_thread_queue_disconnect(SilcThreadQueue queue); + * SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue); * * DESCRIPTION * @@ -107,30 +123,41 @@ void silc_thread_queue_connect(SilcThreadQueue queue); * called after the thread has finished using the thread queue. * * When the last thread has disconnected from the queue the queue is - * destroyed. + * destroyed and this returns FALSE. Otherwise this returns TRUE as + * long as there are threads connected to the queue. * ***/ -void silc_thread_queue_disconnect(SilcThreadQueue queue); +SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue); /****f* silcutil/silc_thread_queue_push * * SYNOPSIS * - * void silc_thread_queue_push(SilcThreadQueue queue, void *data); + * void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, + * void *data, SilcBool demux); * * DESCRIPTION * * Pushes the `data' into the thread queue. The data will become - * immediately available in the queue for other threads. + * immediately available in the queue for other threads. The `pipe_index' + * specifies the pipe to push the data into. First pipe has index 0, + * second has index 1, and so on. If there is only one pipe the index + * is always 0. + * + * If the `demux' is TRUE this will perform demuxing; data pushed to one + * pipe will be pushed to all pipes. In this case the `pipe_index' is + * ignored. Each pipe will return the same data when popped. * ***/ -void silc_thread_queue_push(SilcThreadQueue queue, void *data); +void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data, + SilcBool demux); /****f* silcutil/silc_thread_queue_pop * * SYNOPSIS * - * void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block); + * void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index, + * SilcBool block); * * DESCRIPTION * @@ -139,15 +166,20 @@ void silc_thread_queue_push(SilcThreadQueue queue, void *data); * If `block' is FALSE and data is not available this will return NULL. * If `block' is TRUE this will never return NULL. * + * The `pipe_index' specifies the pipe from which to pop the data. + * First pipe has index 0, second has index 1, and so on. If there is + * only one pipe the index is always 0. + * ***/ -void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block); +void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index, + SilcBool block); /****f* silcutil/silc_thread_queue_timed_pop * * SYNOPSIS * * void *silc_thread_queue_timed_pop(SilcThreadQueue queue, - * int timeout_msec); + * int pipe_index, int timeout_msec); * * DESCRIPTION * @@ -155,8 +187,12 @@ void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block); * milliseconds for the data to arrive. If data is not available when * the timeout occurrs this returns NULL. * + * The `pipe_index' specifies the pipe from which to pop the data. + * First pipe has index 0, second has index 1, and so on. If there is + * only one pipe the index is always 0. + * ***/ -void *silc_thread_queue_timed_pop(SilcThreadQueue queue, +void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index, int timeout_msec); /****f* silcutil/silc_thread_queue_pop_list @@ -164,7 +200,7 @@ void *silc_thread_queue_timed_pop(SilcThreadQueue queue, * SYNOPSIS * * SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, - * SilcBool block); + * int pipe_index, SilcBool block); * * DESCRIPTION * @@ -174,7 +210,12 @@ void *silc_thread_queue_timed_pop(SilcThreadQueue queue, * immediately. If `block' is TRUE this will block if the queue is * empty. * + * The `pipe_index' specifies the pipe from which to pop the list. + * First pipe has index 0, second has index 1, and so on. If there is + * only one pipe the index is always 0. + * ***/ -SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block); +SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index, + SilcBool block); #endif /* SILCTHREADQUEUE_H */ diff --git a/lib/silcutil/tests/test_silcthreadqueue.c b/lib/silcutil/tests/test_silcthreadqueue.c index daecebff..74d3902e 100644 --- a/lib/silcutil/tests/test_silcthreadqueue.c +++ b/lib/silcutil/tests/test_silcthreadqueue.c @@ -17,7 +17,7 @@ SILC_FSM_STATE(test_st_start) SILC_LOG_DEBUG(("test_st_start")); - queue = silc_thread_queue_alloc(); + queue = silc_thread_queue_alloc(1, FALSE); if (!queue) { silc_fsm_next(fsm, test_st_finish); return SILC_FSM_CONTINUE; @@ -43,7 +43,7 @@ SILC_FSM_STATE(test_st_wait) SILC_LOG_DEBUG(("Wait for data")); /* Wait for data */ - data = silc_thread_queue_pop(queue, TRUE); + data = silc_thread_queue_pop(queue, 0, TRUE); if (!data || data != (void *)100) { silc_fsm_next(fsm, test_st_finish); return SILC_FSM_CONTINUE; @@ -62,7 +62,7 @@ SILC_FSM_STATE(test_st_thread_start) /* Send data */ SILC_LOG_DEBUG(("Send data")); - silc_thread_queue_push(queue, (void *)100); + silc_thread_queue_push(queue, 0, (void *)100, FALSE); silc_thread_queue_disconnect(queue); return SILC_FSM_FINISH; @@ -90,6 +90,8 @@ int main(int argc, char **argv) { SilcFSM fsm; + silc_runtime_init(); + if (argc > 1 && !strcmp(argv[1], "-d")) { silc_log_debug(TRUE); silc_log_debug_hexdump(TRUE); @@ -116,5 +118,7 @@ int main(int argc, char **argv) SILC_LOG_DEBUG(("Testing was %s", success ? "SUCCESS" : "FAILURE")); fprintf(stderr, "Testing was %s\n", success ? "SUCCESS" : "FAILURE"); + silc_runtime_uninit(); + return !success; } -- 2.24.0