/************************** Types and definitions ***************************/
-/* Thread queue context */
-struct SilcThreadQueueStruct {
- SilcDList queue; /* The queue */
+/* Queue data context */
+typedef struct SilcThreadQueueDataStruct {
+ struct SilcThreadQueueDataStruct *next;
+ void *data; /* User data */
+} *SilcThreadQueueData;
+
+/* Pipe */
+typedef struct SilcThreadQueuePipeStruct {
+ SilcList queue; /* The queue */
+ SilcList freelist; /* Free list of queue data contexts */
SilcMutex lock; /* Queue lock */
SilcCond cond; /* Condition for waiting */
+} *SilcThreadQueuePipe;
+
+/* Thread queue context */
+struct SilcThreadQueueStruct {
+ SilcThreadQueuePipe pipes; /* Queue pipes */
SilcAtomic32 connected; /* Number of connected threads */
+ unsigned int num_pipes : 31; /* Number of pipes */
+ unsigned int fifo : 1; /* FIFO */
};
/************************** SILC Thread Queue API ***************************/
/* Allocate thread queue */
-SilcThreadQueue silc_thread_queue_alloc(void)
+SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo)
{
SilcThreadQueue queue;
+ SilcUInt32 i;
+
+ if (!num_pipes)
+ num_pipes = 1;
queue = silc_calloc(1, sizeof(*queue));
if (!queue)
return NULL;
- SILC_LOG_DEBUG(("Allocated thread queue %p", queue));
-
- if (!silc_mutex_alloc(&queue->lock)) {
- silc_free(queue);
- return NULL;
- }
+ SILC_LOG_DEBUG(("Allocated thread queue %p, %d pipes %s", queue,
+ num_pipes, fifo ? "FIFO" : ""));
- if (!silc_cond_alloc(&queue->cond)) {
- silc_mutex_free(queue->lock);
+ queue->pipes = silc_calloc(num_pipes, sizeof(*queue->pipes));
+ if (!queue->pipes) {
silc_free(queue);
return NULL;
}
-
- queue->queue = silc_dlist_init();
- if (!queue->queue) {
- silc_cond_free(queue->cond);
- silc_mutex_free(queue->lock);
- silc_free(queue);
- return NULL;
+ queue->num_pipes = num_pipes;
+ queue->fifo = fifo;
+
+ for (i = 0; i < num_pipes; i++) {
+ silc_list_init(queue->pipes[i].queue,
+ struct SilcThreadQueueDataStruct, next);
+ silc_list_init(queue->pipes[i].freelist,
+ struct SilcThreadQueueDataStruct, next);
+ silc_mutex_alloc(&queue->pipes[i].lock);
+ silc_cond_alloc(&queue->pipes[i].cond);
}
silc_atomic_init32(&queue->connected, 1);
void silc_thread_queue_connect(SilcThreadQueue queue)
{
+ SILC_LOG_DEBUG(("Connect to thread queue %p", queue));
silc_atomic_add_int32(&queue->connected, 1);
}
/* Disconnect current thread from queue */
-void silc_thread_queue_disconnect(SilcThreadQueue queue)
+SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue)
{
+ SilcUInt32 i;
+ SilcThreadQueueData data;
+
+ SILC_LOG_DEBUG(("Disconnect from thread queue %p", queue));
+
if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
- return;
+ return TRUE;
/* Free queue */
SILC_LOG_DEBUG(("Free thread queue %p", queue));
- silc_cond_free(queue->cond);
- silc_mutex_free(queue->lock);
- silc_dlist_uninit(queue->queue);
+
+ for (i = 0; i < queue->num_pipes; i++) {
+ silc_cond_free(queue->pipes[i].cond);
+ silc_mutex_free(queue->pipes[i].lock);
+ silc_list_start(queue->pipes[i].queue);
+ while ((data = silc_list_get(queue->pipes[i].queue)))
+ silc_free(data);
+ silc_list_start(queue->pipes[i].freelist);
+ while ((data = silc_list_get(queue->pipes[i].freelist)))
+ silc_free(data);
+ }
+
+ silc_free(queue->pipes);
silc_atomic_uninit32(&queue->connected);
silc_free(queue);
+
+ return FALSE;
}
/* Push data to queue */
-void silc_thread_queue_push(SilcThreadQueue queue, void *data)
+void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data,
+ SilcBool demux)
{
+ SilcThreadQueueData d;
+ SilcUInt32 i;
+
if (silc_unlikely(!data))
return;
- SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue));
+ SILC_ASSERT(pipe_index < queue->num_pipes);
+
+ SILC_LOG_DEBUG(("Push data %p to thread queue %p, pipe %d, demux %s",
+ data, queue, pipe_index, demux ? "yes" : "no"));
+
+ silc_mutex_lock(queue->pipes[pipe_index].lock);
+
+ d = silc_list_pop(queue->pipes[pipe_index].freelist);
+ if (!d) {
+ d = silc_calloc(1, sizeof(*d));
+ if (!d)
+ return;
+ }
+ d->data = data;
+
+ if (demux) {
+ for (i = 0; i < queue->num_pipes; i++) {
+ if (queue->fifo)
+ silc_list_add(queue->pipes[i].queue, d);
+ else
+ silc_list_insert(queue->pipes[i].queue, NULL, d);
+ }
+ } else {
+ if (queue->fifo)
+ silc_list_add(queue->pipes[pipe_index].queue, d);
+ else
+ silc_list_insert(queue->pipes[pipe_index].queue, NULL, d);
+ }
- silc_mutex_lock(queue->lock);
- silc_dlist_start(queue->queue);
- silc_dlist_insert(queue->queue, data);
- silc_cond_broadcast(queue->cond);
- silc_mutex_unlock(queue->lock);
+ silc_cond_broadcast(queue->pipes[pipe_index].cond);
+ silc_mutex_unlock(queue->pipes[pipe_index].lock);
}
/* Get data or wait if wanted or return NULL. */
-void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block)
+void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
+ SilcBool block)
{
+ SilcThreadQueueData d;
void *data;
- if (block)
- return silc_thread_queue_timed_pop(queue, 0);
-
- silc_mutex_lock(queue->lock);
-
- silc_dlist_start(queue->queue);
- data = silc_dlist_get(queue->queue);
- if (data)
- silc_dlist_del(queue->queue, data);
+ SILC_ASSERT(pipe_index < queue->num_pipes);
+
+ silc_mutex_lock(queue->pipes[pipe_index].lock);
+
+ if (block) {
+ /* Block */
+ while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
+ silc_cond_wait(queue->pipes[pipe_index].cond,
+ queue->pipes[pipe_index].lock);
+ silc_list_add(queue->pipes[pipe_index].freelist, d);
+ data = d->data;
+ } else {
+ /* No blocking */
+ d = silc_list_pop(queue->pipes[pipe_index].queue);
+ data = NULL;
+ if (d) {
+ silc_list_add(queue->pipes[pipe_index].freelist, d);
+ data = d->data;
+ }
+ }
- SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+ SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data,
+ queue, pipe_index));
- silc_mutex_unlock(queue->lock);
+ silc_mutex_unlock(queue->pipes[pipe_index].lock);
return data;
}
/* Get data or wait for a while */
-void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index,
int timeout_msec)
{
- void *data;
+ SilcThreadQueueData d;
+ void *data = NULL;
+
+ SILC_ASSERT(pipe_index < queue->num_pipes);
- silc_mutex_lock(queue->lock);
+ silc_mutex_lock(queue->pipes[pipe_index].lock);
- silc_dlist_start(queue->queue);
- while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) {
- if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec))
+ while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
+ if (!silc_cond_timedwait(queue->pipes[pipe_index].cond,
+ queue->pipes[pipe_index].lock, timeout_msec))
break;
- silc_dlist_start(queue->queue);
- }
- if (data)
- silc_dlist_del(queue->queue, data);
+ if (d) {
+ silc_list_add(queue->pipes[pipe_index].freelist, d);
+ data = d->data;
+ }
- SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
+ SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, queue,
+ pipe_index));
- silc_mutex_unlock(queue->lock);
+ silc_mutex_unlock(queue->pipes[pipe_index].lock);
return data;
}
/* Pop entire queue */
-SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block)
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index,
+ SilcBool block)
{
+ SilcThreadQueueData d;
SilcDList list;
- silc_mutex_lock(queue->lock);
+ SILC_ASSERT(pipe_index < queue->num_pipes);
+
+ silc_mutex_lock(queue->pipes[pipe_index].lock);
if (block)
- while (silc_dlist_count(queue->queue) == 0)
- silc_cond_wait(queue->cond, queue->lock);
+ while (silc_list_count(queue->pipes[pipe_index].queue) == 0)
+ silc_cond_wait(queue->pipes[pipe_index].cond,
+ queue->pipes[pipe_index].lock);
+
+ list = silc_dlist_init();
+ if (!list)
+ return NULL;
+
+ silc_list_start(queue->pipes[pipe_index].queue);
+ while ((d = silc_list_get(queue->pipes[pipe_index].queue))) {
+ silc_dlist_add(list, d->data);
+ silc_list_add(queue->pipes[pipe_index].freelist, d);
+ }
- list = queue->queue;
- queue->queue = silc_dlist_init();
+ silc_list_init(queue->pipes[pipe_index].queue,
+ struct SilcThreadQueueDataStruct, next);
- silc_mutex_unlock(queue->lock);
+ silc_mutex_unlock(queue->pipes[pipe_index].lock);
silc_dlist_start(list);
* takes the data from the queue or blocks until more data is available
* in the queue.
*
+ * The queue itself can have one ore more pipes, allowing user to use one
+ * queue to pass different information in different pipes, if each pipe
+ * need to be dedicated to specific type of data.
+ *
* EXAMPLE
*
* Thread 1:
*
* // Create queue and push data into it
- * SilcThreadQueue queue = silc_thread_queue_alloc();
- * silc_thread_queue_push(queue, data);
+ * SilcThreadQueue queue = silc_thread_queue_alloc(1, FALSE);
+ * silc_thread_queue_push(queue, 0, data, FALSE);
*
* Thread 2:
*
* silc_thread_queue_connect(queue);
*
* // Block here until data is available from the queue
- * data = silc_thread_queue_pop(queue, TRUE);
+ * data = silc_thread_queue_pop(queue, 0, TRUE);
*
***/
*
* SYNOPSIS
*
- * SilcThreadQueue silc_thread_queue_alloc(void);
+ * SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo);
*
* DESCRIPTION
*
* queue it must first connect to it by calling silc_thread_queue_connect.
* The thread that creates the queue automatically connects to the queue.
*
+ * The 'num_pipes' specifies the number of pipes that exist in the queue.
+ * If `num_pipes' is 0, the 0 is ignored and one pipe is created anyway.
+ * By default, caller should create one pipe, unless more are needed.
+ * The pipes are referenced by index. First pipe has index 0, second
+ * index 1, and so on. The index is given as argument when pushing
+ * and popping from the queue.
+ *
+ * By default data popped from the queue is done in last-in-first-out
+ * order; the most recently added data is popped first. If `fifo' is
+ * set to TRUE the order is first-in-first-out; the first added data is
+ * popped first.
+ *
***/
-SilcThreadQueue silc_thread_queue_alloc(void);
+SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo);
/****f* silcutil/silc_thread_queue_connect
*
*
* SYNOPSIS
*
- * void silc_thread_queue_disconnect(SilcThreadQueue queue);
+ * SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue);
*
* DESCRIPTION
*
* called after the thread has finished using the thread queue.
*
* When the last thread has disconnected from the queue the queue is
- * destroyed.
+ * destroyed and this returns FALSE. Otherwise this returns TRUE as
+ * long as there are threads connected to the queue.
*
***/
-void silc_thread_queue_disconnect(SilcThreadQueue queue);
+SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue);
/****f* silcutil/silc_thread_queue_push
*
* SYNOPSIS
*
- * void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+ * void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index,
+ * void *data, SilcBool demux);
*
* DESCRIPTION
*
* Pushes the `data' into the thread queue. The data will become
- * immediately available in the queue for other threads.
+ * immediately available in the queue for other threads. The `pipe_index'
+ * specifies the pipe to push the data into. First pipe has index 0,
+ * second has index 1, and so on. If there is only one pipe the index
+ * is always 0.
+ *
+ * If the `demux' is TRUE this will perform demuxing; data pushed to one
+ * pipe will be pushed to all pipes. In this case the `pipe_index' is
+ * ignored. Each pipe will return the same data when popped.
*
***/
-void silc_thread_queue_push(SilcThreadQueue queue, void *data);
+void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data,
+ SilcBool demux);
/****f* silcutil/silc_thread_queue_pop
*
* SYNOPSIS
*
- * void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+ * void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
+ * SilcBool block);
*
* DESCRIPTION
*
* If `block' is FALSE and data is not available this will return NULL.
* If `block' is TRUE this will never return NULL.
*
+ * The `pipe_index' specifies the pipe from which to pop the data.
+ * First pipe has index 0, second has index 1, and so on. If there is
+ * only one pipe the index is always 0.
+ *
***/
-void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block);
+void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
+ SilcBool block);
/****f* silcutil/silc_thread_queue_timed_pop
*
* SYNOPSIS
*
* void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
- * int timeout_msec);
+ * int pipe_index, int timeout_msec);
*
* DESCRIPTION
*
* milliseconds for the data to arrive. If data is not available when
* the timeout occurrs this returns NULL.
*
+ * The `pipe_index' specifies the pipe from which to pop the data.
+ * First pipe has index 0, second has index 1, and so on. If there is
+ * only one pipe the index is always 0.
+ *
***/
-void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
+void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index,
int timeout_msec);
/****f* silcutil/silc_thread_queue_pop_list
* SYNOPSIS
*
* SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue,
- * SilcBool block);
+ * int pipe_index, SilcBool block);
*
* DESCRIPTION
*
* immediately. If `block' is TRUE this will block if the queue is
* empty.
*
+ * The `pipe_index' specifies the pipe from which to pop the list.
+ * First pipe has index 0, second has index 1, and so on. If there is
+ * only one pipe the index is always 0.
+ *
***/
-SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block);
+SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index,
+ SilcBool block);
#endif /* SILCTHREADQUEUE_H */