Major rewrite of the SilcThreadQueue API
[runtime.git] / lib / silcutil / silcthreadqueue.c
1 /*
2
3   silcthreadqueue.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2008 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silcruntime.h"
21
22 /************************** Types and definitions ***************************/
23
24 /* Queue data context */
25 typedef struct SilcThreadQueueDataStruct {
26   struct SilcThreadQueueDataStruct *next;
27   void *data;                   /* User data */
28 } *SilcThreadQueueData;
29
30 /* Pipe */
31 typedef struct SilcThreadQueuePipeStruct {
32   SilcList queue;               /* The queue */
33   SilcList freelist;            /* Free list of queue data contexts */
34   SilcMutex lock;               /* Queue lock */
35   SilcCond cond;                /* Condition for waiting */
36 } *SilcThreadQueuePipe;
37
38 /* Thread queue context */
39 struct SilcThreadQueueStruct {
40   SilcThreadQueuePipe pipes;    /* Queue pipes */
41   SilcAtomic32 connected;       /* Number of connected threads */
42   unsigned int num_pipes  : 31; /* Number of pipes */
43   unsigned int fifo       : 1;  /* FIFO */
44 };
45
46 /************************** SILC Thread Queue API ***************************/
47
48 /* Allocate thread queue */
49
50 SilcThreadQueue silc_thread_queue_alloc(int num_pipes, SilcBool fifo)
51 {
52   SilcThreadQueue queue;
53   SilcUInt32 i;
54
55   if (!num_pipes)
56     num_pipes = 1;
57
58   queue = silc_calloc(1, sizeof(*queue));
59   if (!queue)
60     return NULL;
61
62   SILC_LOG_DEBUG(("Allocated thread queue %p, %d pipes %s", queue,
63                   num_pipes, fifo ? "FIFO" : ""));
64
65   queue->pipes = silc_calloc(num_pipes, sizeof(*queue->pipes));
66   if (!queue->pipes) {
67     silc_free(queue);
68     return NULL;
69   }
70   queue->num_pipes = num_pipes;
71   queue->fifo = fifo;
72
73   for (i = 0; i < num_pipes; i++) {
74     silc_list_init(queue->pipes[i].queue,
75                    struct SilcThreadQueueDataStruct, next);
76     silc_list_init(queue->pipes[i].freelist,
77                    struct SilcThreadQueueDataStruct, next);
78     silc_mutex_alloc(&queue->pipes[i].lock);
79     silc_cond_alloc(&queue->pipes[i].cond);
80   }
81
82   silc_atomic_init32(&queue->connected, 1);
83
84   return queue;
85 }
86
87 /* Connect current thread to queue */
88
89 void silc_thread_queue_connect(SilcThreadQueue queue)
90 {
91   SILC_LOG_DEBUG(("Connect to thread queue %p", queue));
92   silc_atomic_add_int32(&queue->connected, 1);
93 }
94
95 /* Disconnect current thread from queue */
96
97 SilcBool silc_thread_queue_disconnect(SilcThreadQueue queue)
98 {
99   SilcUInt32 i;
100   SilcThreadQueueData data;
101
102   SILC_LOG_DEBUG(("Disconnect from thread queue %p", queue));
103
104   if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
105     return TRUE;
106
107   /* Free queue */
108   SILC_LOG_DEBUG(("Free thread queue %p", queue));
109
110   for (i = 0; i < queue->num_pipes; i++) {
111     silc_cond_free(queue->pipes[i].cond);
112     silc_mutex_free(queue->pipes[i].lock);
113     silc_list_start(queue->pipes[i].queue);
114     while ((data = silc_list_get(queue->pipes[i].queue)))
115       silc_free(data);
116     silc_list_start(queue->pipes[i].freelist);
117     while ((data = silc_list_get(queue->pipes[i].freelist)))
118       silc_free(data);
119   }
120
121   silc_free(queue->pipes);
122   silc_atomic_uninit32(&queue->connected);
123   silc_free(queue);
124
125   return FALSE;
126 }
127
128 /* Push data to queue */
129
130 void silc_thread_queue_push(SilcThreadQueue queue, int pipe_index, void *data,
131                             SilcBool demux)
132 {
133   SilcThreadQueueData d;
134   SilcUInt32 i;
135
136   if (silc_unlikely(!data))
137     return;
138
139   SILC_ASSERT(pipe_index < queue->num_pipes);
140
141   SILC_LOG_DEBUG(("Push data %p to thread queue %p, pipe %d, demux %s",
142                   data, queue, pipe_index, demux ? "yes" : "no"));
143
144   silc_mutex_lock(queue->pipes[pipe_index].lock);
145
146   d = silc_list_pop(queue->pipes[pipe_index].freelist);
147   if (!d) {
148     d = silc_calloc(1, sizeof(*d));
149     if (!d)
150       return;
151   }
152   d->data = data;
153
154   if (demux) {
155     for (i = 0; i < queue->num_pipes; i++) {
156       if (queue->fifo)
157         silc_list_add(queue->pipes[i].queue, d);
158       else
159         silc_list_insert(queue->pipes[i].queue, NULL, d);
160     }
161   } else {
162     if (queue->fifo)
163       silc_list_add(queue->pipes[pipe_index].queue, d);
164     else
165       silc_list_insert(queue->pipes[pipe_index].queue, NULL, d);
166   }
167
168   silc_cond_broadcast(queue->pipes[pipe_index].cond);
169   silc_mutex_unlock(queue->pipes[pipe_index].lock);
170 }
171
172 /* Get data or wait if wanted or return NULL. */
173
174 void *silc_thread_queue_pop(SilcThreadQueue queue, int pipe_index,
175                             SilcBool block)
176 {
177   SilcThreadQueueData d;
178   void *data;
179
180   SILC_ASSERT(pipe_index < queue->num_pipes);
181
182   silc_mutex_lock(queue->pipes[pipe_index].lock);
183
184   if (block) {
185     /* Block */
186     while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
187       silc_cond_wait(queue->pipes[pipe_index].cond,
188                      queue->pipes[pipe_index].lock);
189     silc_list_add(queue->pipes[pipe_index].freelist, d);
190     data = d->data;
191   } else {
192     /* No blocking */
193     d = silc_list_pop(queue->pipes[pipe_index].queue);
194     data = NULL;
195     if (d) {
196       silc_list_add(queue->pipes[pipe_index].freelist, d);
197       data = d->data;
198     }
199   }
200
201   SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data,
202                   queue, pipe_index));
203
204   silc_mutex_unlock(queue->pipes[pipe_index].lock);
205
206   return data;
207 }
208
209 /* Get data or wait for a while */
210
211 void *silc_thread_queue_timed_pop(SilcThreadQueue queue, int pipe_index,
212                                   int timeout_msec)
213 {
214   SilcThreadQueueData d;
215   void *data = NULL;
216
217   SILC_ASSERT(pipe_index < queue->num_pipes);
218
219   silc_mutex_lock(queue->pipes[pipe_index].lock);
220
221   while ((d = silc_list_pop(queue->pipes[pipe_index].queue)) == NULL)
222     if (!silc_cond_timedwait(queue->pipes[pipe_index].cond,
223                              queue->pipes[pipe_index].lock, timeout_msec))
224       break;
225
226   if (d) {
227     silc_list_add(queue->pipes[pipe_index].freelist, d);
228     data = d->data;
229   }
230
231   SILC_LOG_DEBUG(("Pop data %p from thread queue %p, pipe %d", data, queue,
232                   pipe_index));
233
234   silc_mutex_unlock(queue->pipes[pipe_index].lock);
235
236   return data;
237 }
238
239 /* Pop entire queue */
240
241 SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, int pipe_index,
242                                      SilcBool block)
243 {
244   SilcThreadQueueData d;
245   SilcDList list;
246
247   SILC_ASSERT(pipe_index < queue->num_pipes);
248
249   silc_mutex_lock(queue->pipes[pipe_index].lock);
250
251   if (block)
252     while (silc_list_count(queue->pipes[pipe_index].queue) == 0)
253       silc_cond_wait(queue->pipes[pipe_index].cond,
254                      queue->pipes[pipe_index].lock);
255
256   list = silc_dlist_init();
257   if (!list)
258     return NULL;
259
260   silc_list_start(queue->pipes[pipe_index].queue);
261   while ((d = silc_list_get(queue->pipes[pipe_index].queue))) {
262     silc_dlist_add(list, d->data);
263     silc_list_add(queue->pipes[pipe_index].freelist, d);
264   }
265
266   silc_list_init(queue->pipes[pipe_index].queue,
267                  struct SilcThreadQueueDataStruct, next);
268
269   silc_mutex_unlock(queue->pipes[pipe_index].lock);
270
271   silc_dlist_start(list);
272
273   return list;
274 }