5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2008 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
20 #include "silcruntime.h"
22 /************************** Types and definitions ***************************/
24 /* Queue data context */
25 typedef struct SilcThreadQueueDataStruct {
26 struct SilcThreadQueueDataStruct *next;
27 void *data; /* User data */
28 } *SilcThreadQueueData;
31 typedef struct SilcThreadQueuePipeStruct {
32 SilcList queue; /* The queue */
33 SilcList freelist; /* Free list of queue data contexts */
34 SilcMutex lock; /* Queue lock */
35 SilcCond cond; /* Condition for waiting */
36 } *SilcThreadQueuePipe;
38 /* Thread queue context */
39 struct SilcThreadQueueStruct {
40 SilcThreadQueuePipe pipes; /* Queue pipes */
41 SilcAtomic32 connected; /* Number of connected threads */
42 unsigned int num_pipes : 31; /* Number of pipes */
43 unsigned int fifo : 1; /* FIFO */
46 /************************** SILC Thread Queue API ***************************/
48 /* Allocate thread queue */
50 SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo)
52 SilcThreadQueue queue;
58 queue = silc_calloc(1, sizeof(*queue));
62 SILC_LOG_DEBUG(("Allocated thread queue %p, %d pipes %s", queue,
63 num_pipes, fifo ? "FIFO" : ""));
65 queue->pipes = silc_calloc(num_pipes, sizeof(*queue->pipes));
70 queue->num_pipes = num_pipes;
73 for (i = 0; i < num_pipes; i++) {
74 silc_list_init(queue->pipes[i].queue,
75 struct SilcThreadQueueDataStruct, next);
76 silc_list_init(queue->pipes[i].freelist,
77 struct SilcThreadQueueDataStruct, next);
78 silc_mutex_alloc(&queue->pipes[i].lock);
79 silc_cond_alloc(&queue->pipes[i].cond);
82 silc_atomic_init32(&queue->connected, 1);
87 /* Connect current thread to queue */
89 void silc_thread_queue_connect(SilcThreadQueue queue)
91 SILC_LOG_DEBUG(("Connect to thread queue %p", queue));
92 silc_atomic_add_int32(&queue->connected, 1);
95 /* Disconnect current thread from queue */
97 SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue)
100 SilcThreadQueueData data;
102 SILC_LOG_DEBUG(("Disconnect from thread queue %p", queue));
104 if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
108 SILC_LOG_DEBUG(("Free thread queue %p", queue));
110 for (i = 0; i < queue->num_pipes; i++) {
111 silc_cond_free(queue->pipes[i].cond);
112 silc_mutex_free(queue->pipes[i].lock);
113 silc_list_start(queue->pipes[i].queue);
114 while ((data = silc_list_get(queue->pipes[i].queue)))
116 silc_list_start(queue->pipes[i].freelist);
117 while ((data = silc_list_get(queue->pipes[i].freelist)))
121 silc_free(queue->pipes);
122 silc_atomic_uninit32(&queue->connected);
128 /* Push data to queue */
130 void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data,
133 SilcThreadQueueData d;
136 if (silc_unlikely(!data))
139 SILC_ASSERT(pipe_index < queue->num_pipes);
141 SILC_LOG_DEBUG(("Push data %p to thread queue %p, pipe %d, demux %s",
142 data, queue, pipe_index, demux ? "yes" : "no"));
144 silc_mutex_lock(queue->pipes[pipe_index].lock);
146 d = silc_list_pop(queue->pipes[pipe_index].freelist);
148 d = silc_calloc(1, sizeof(*d));
155 for (i = 0; i < queue->num_pipes; i++) {
157 silc_list_add(queue->pipes[i].queue, d);
159 silc_list_insert(queue->pipes[i].queue, NULL, d);
163 silc_list_add(queue->pipes[pipe_index].queue, d);
165 silc_list_insert(queue->pipes[pipe_index].queue, NULL, d);
168 silc_cond_broadcast(queue->pipes[pipe_index].cond);
169 silc_mutex_unlock(queue->pipes[pipe_index].lock);
172 /* Get data or wait if wanted or return NULL. */
174 void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
177 SilcThreadQueueData d;
180 SILC_ASSERT(pipe_index < queue->num_pipes);
182 silc_mutex_lock(queue->pipes[pipe_index].lock);
186 while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
187 silc_cond_wait(queue->pipes[pipe_index].cond,
188 queue->pipes[pipe_index].lock);
189 silc_list_add(queue->pipes[pipe_index].freelist, d);
193 d = silc_list_pop(queue->pipes[pipe_index].queue);
196 silc_list_add(queue->pipes[pipe_index].freelist, d);
201 SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data,
204 silc_mutex_unlock(queue->pipes[pipe_index].lock);
209 /* Get data or wait for a while */
211 void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index,
214 SilcThreadQueueData d;
217 SILC_ASSERT(pipe_index < queue->num_pipes);
219 silc_mutex_lock(queue->pipes[pipe_index].lock);
221 while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
222 if (!silc_cond_timedwait(queue->pipes[pipe_index].cond,
223 queue->pipes[pipe_index].lock, timeout_msec))
227 silc_list_add(queue->pipes[pipe_index].freelist, d);
231 SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, queue,
234 silc_mutex_unlock(queue->pipes[pipe_index].lock);
239 /* Pop entire queue */
241 SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index,
244 SilcThreadQueueData d;
247 SILC_ASSERT(pipe_index < queue->num_pipes);
249 silc_mutex_lock(queue->pipes[pipe_index].lock);
252 while (silc_list_count(queue->pipes[pipe_index].queue) == 0)
253 silc_cond_wait(queue->pipes[pipe_index].cond,
254 queue->pipes[pipe_index].lock);
256 list = silc_dlist_init();
260 silc_list_start(queue->pipes[pipe_index].queue);
261 while ((d = silc_list_get(queue->pipes[pipe_index].queue))) {
262 silc_dlist_add(list, d->data);
263 silc_list_add(queue->pipes[pipe_index].freelist, d);
266 silc_list_init(queue->pipes[pipe_index].queue,
267 struct SilcThreadQueueDataStruct, next);
269 silc_mutex_unlock(queue->pipes[pipe_index].lock);
271 silc_dlist_start(list);