X-Git-Url: http://git.silcnet.org/gitweb/?p=runtime.git;a=blobdiff_plain;f=lib%2Fsilcutil%2Fsilcthreadqueue.c;h=324063fc9755e5273891920410362e466b9024f5;hp=1ca9ef3b0b00094f18973fd3107bcb2afb0b6adc;hb=76c779783817ba7cdbe67ca83971134ce70e662f;hpb=480c4f6e83cbdec38af98419430450b9fd8600f8 diff --git a/lib/silcutil/silcthreadqueue.c b/lib/silcutil/silcthreadqueue.c index 1ca9ef3b..324063fc 100644 --- a/lib/silcutil/silcthreadqueue.c +++ b/lib/silcutil/silcthreadqueue.c @@ -21,45 +21,62 @@ /************************** Types and definitions ***************************/ -/* Thread queue context */ -struct SilcThreadQueueStruct { - SilcDList queue; /* The queue */ +/* Queue data context */ +typedef struct SilcThreadQueueDataStruct { + struct SilcThreadQueueDataStruct *next; + void *data; /* User data */ +} *SilcThreadQueueData; + +/* Pipe */ +typedef struct SilcThreadQueuePipeStruct { + SilcList queue; /* The queue */ + SilcList freelist; /* Free list of queue data contexts */ SilcMutex lock; /* Queue lock */ SilcCond cond; /* Condition for waiting */ +} *SilcThreadQueuePipe; + +/* Thread queue context */ +struct SilcThreadQueueStruct { + SilcThreadQueuePipe pipes; /* Queue pipes */ SilcAtomic32 connected; /* Number of connected threads */ + unsigned int num_pipes : 31; /* Number of pipes */ + unsigned int fifo : 1; /* FIFO */ }; /************************** SILC Thread Queue API ***************************/ /* Allocate thread queue */ -SilcThreadQueue silc_thread_queue_alloc(void) +SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo) { SilcThreadQueue queue; + SilcUInt32 i; + + if (!num_pipes) + num_pipes = 1; queue = silc_calloc(1, sizeof(*queue)); if (!queue) return NULL; - SILC_LOG_DEBUG(("Allocated thread queue %p", queue)); - - if (!silc_mutex_alloc(&queue->lock)) { - silc_free(queue); - return NULL; - } + SILC_LOG_DEBUG(("Allocated thread queue %p, %d pipes %s", queue, + num_pipes, fifo ? "FIFO" : "")); - if (!silc_cond_alloc(&queue->cond)) { - silc_mutex_free(queue->lock); + queue->pipes = silc_calloc(num_pipes, sizeof(*queue->pipes)); + if (!queue->pipes) { silc_free(queue); return NULL; } - - queue->queue = silc_dlist_init(); - if (!queue->queue) { - silc_cond_free(queue->cond); - silc_mutex_free(queue->lock); - silc_free(queue); - return NULL; + queue->num_pipes = num_pipes; + queue->fifo = fifo; + + for (i = 0; i < num_pipes; i++) { + silc_list_init(queue->pipes[i].queue, + struct SilcThreadQueueDataStruct, next); + silc_list_init(queue->pipes[i].freelist, + struct SilcThreadQueueDataStruct, next); + silc_mutex_alloc(&queue->pipes[i].lock); + silc_cond_alloc(&queue->pipes[i].cond); } silc_atomic_init32(&queue->connected, 1); @@ -71,106 +88,185 @@ SilcThreadQueue silc_thread_queue_alloc(void) void silc_thread_queue_connect(SilcThreadQueue queue) { + SILC_LOG_DEBUG(("Connect to thread queue %p", queue)); silc_atomic_add_int32(&queue->connected, 1); } /* Disconnect current thread from queue */ -void silc_thread_queue_disconnect(SilcThreadQueue queue) +SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue) { + SilcUInt32 i; + SilcThreadQueueData data; + + SILC_LOG_DEBUG(("Disconnect from thread queue %p", queue)); + if (silc_atomic_sub_int32(&queue->connected, 1) > 0) - return; + return TRUE; /* Free queue */ SILC_LOG_DEBUG(("Free thread queue %p", queue)); - silc_cond_free(queue->cond); - silc_mutex_free(queue->lock); - silc_dlist_uninit(queue->queue); + + for (i = 0; i < queue->num_pipes; i++) { + silc_cond_free(queue->pipes[i].cond); + silc_mutex_free(queue->pipes[i].lock); + silc_list_start(queue->pipes[i].queue); + while ((data = silc_list_get(queue->pipes[i].queue))) + silc_free(data); + silc_list_start(queue->pipes[i].freelist); + while ((data = silc_list_get(queue->pipes[i].freelist))) + silc_free(data); + } + + silc_free(queue->pipes); silc_atomic_uninit32(&queue->connected); silc_free(queue); + + return FALSE; } /* Push data to queue */ -void silc_thread_queue_push(SilcThreadQueue queue, void *data) +void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data, + SilcBool demux) { + SilcThreadQueueData d; + SilcUInt32 i; + if (silc_unlikely(!data)) return; - SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue)); + SILC_ASSERT(pipe_index < queue->num_pipes); + + SILC_LOG_DEBUG(("Push data %p to thread queue %p, pipe %d, demux %s", + data, queue, pipe_index, demux ? "yes" : "no")); + + silc_mutex_lock(queue->pipes[pipe_index].lock); + + d = silc_list_pop(queue->pipes[pipe_index].freelist); + if (!d) { + d = silc_calloc(1, sizeof(*d)); + if (!d) + return; + } + d->data = data; + + if (demux) { + for (i = 0; i < queue->num_pipes; i++) { + if (queue->fifo) + silc_list_add(queue->pipes[i].queue, d); + else + silc_list_insert(queue->pipes[i].queue, NULL, d); + } + } else { + if (queue->fifo) + silc_list_add(queue->pipes[pipe_index].queue, d); + else + silc_list_insert(queue->pipes[pipe_index].queue, NULL, d); + } - silc_mutex_lock(queue->lock); - silc_dlist_start(queue->queue); - silc_dlist_insert(queue->queue, data); - silc_cond_broadcast(queue->cond); - silc_mutex_unlock(queue->lock); + silc_cond_broadcast(queue->pipes[pipe_index].cond); + silc_mutex_unlock(queue->pipes[pipe_index].lock); } /* Get data or wait if wanted or return NULL. */ -void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block) +void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index, + SilcBool block) { + SilcThreadQueueData d; void *data; - if (block) - return silc_thread_queue_timed_pop(queue, 0); - - silc_mutex_lock(queue->lock); - - silc_dlist_start(queue->queue); - data = silc_dlist_get(queue->queue); - if (data) - silc_dlist_del(queue->queue, data); + SILC_ASSERT(pipe_index < queue->num_pipes); + + silc_mutex_lock(queue->pipes[pipe_index].lock); + + if (block) { + /* Block */ + while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL) + silc_cond_wait(queue->pipes[pipe_index].cond, + queue->pipes[pipe_index].lock); + silc_list_add(queue->pipes[pipe_index].freelist, d); + data = d->data; + } else { + /* No blocking */ + d = silc_list_pop(queue->pipes[pipe_index].queue); + data = NULL; + if (d) { + silc_list_add(queue->pipes[pipe_index].freelist, d); + data = d->data; + } + } - SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue)); + SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, + queue, pipe_index)); - silc_mutex_unlock(queue->lock); + silc_mutex_unlock(queue->pipes[pipe_index].lock); return data; } /* Get data or wait for a while */ -void *silc_thread_queue_timed_pop(SilcThreadQueue queue, +void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index, int timeout_msec) { - void *data; + SilcThreadQueueData d; + void *data = NULL; + + SILC_ASSERT(pipe_index < queue->num_pipes); - silc_mutex_lock(queue->lock); + silc_mutex_lock(queue->pipes[pipe_index].lock); - silc_dlist_start(queue->queue); - while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) { - if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec)) + while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL) + if (!silc_cond_timedwait(queue->pipes[pipe_index].cond, + queue->pipes[pipe_index].lock, timeout_msec)) break; - silc_dlist_start(queue->queue); - } - if (data) - silc_dlist_del(queue->queue, data); + if (d) { + silc_list_add(queue->pipes[pipe_index].freelist, d); + data = d->data; + } - SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue)); + SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, queue, + pipe_index)); - silc_mutex_unlock(queue->lock); + silc_mutex_unlock(queue->pipes[pipe_index].lock); return data; } /* Pop entire queue */ -SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block) +SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index, + SilcBool block) { + SilcThreadQueueData d; SilcDList list; - silc_mutex_lock(queue->lock); + SILC_ASSERT(pipe_index < queue->num_pipes); + + silc_mutex_lock(queue->pipes[pipe_index].lock); if (block) - while (silc_dlist_count(queue->queue) == 0) - silc_cond_wait(queue->cond, queue->lock); + while (silc_list_count(queue->pipes[pipe_index].queue) == 0) + silc_cond_wait(queue->pipes[pipe_index].cond, + queue->pipes[pipe_index].lock); + + list = silc_dlist_init(); + if (!list) + return NULL; + + silc_list_start(queue->pipes[pipe_index].queue); + while ((d = silc_list_get(queue->pipes[pipe_index].queue))) { + silc_dlist_add(list, d->data); + silc_list_add(queue->pipes[pipe_index].freelist, d); + } - list = queue->queue; - queue->queue = silc_dlist_init(); + silc_list_init(queue->pipes[pipe_index].queue, + struct SilcThreadQueueDataStruct, next); - silc_mutex_unlock(queue->lock); + silc_mutex_unlock(queue->pipes[pipe_index].lock); silc_dlist_start(list);