Added SILC Thread Queue API
[silc.git] / lib / silcutil / silcthreadqueue.c
1 /*
2
3   silcthreadqueue.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2008 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silcruntime.h"
21
22 /************************** Types and definitions ***************************/
23
24 /* Thread queue context */
25 struct SilcThreadQueueStruct {
26   SilcDList queue;              /* The queue */
27   SilcMutex lock;               /* Queue lock */
28   SilcCond cond;                /* Condition for waiting */
29   SilcAtomic32 connected;       /* Number of connected threads */
30 };
31
32 /************************** SILC Thread Queue API ***************************/
33
34 /* Allocate thread queue */
35
36 SilcThreadQueue silc_thread_queue_alloc(void)
37 {
38   SilcThreadQueue queue;
39
40   queue = silc_calloc(1, sizeof(*queue));
41   if (!queue)
42     return NULL;
43
44   SILC_LOG_DEBUG(("Allocated thread queue %p", queue));
45
46   if (!silc_mutex_alloc(&queue->lock)) {
47     silc_free(queue);
48     return NULL;
49   }
50
51   if (!silc_cond_alloc(&queue->cond)) {
52     silc_mutex_free(queue->lock);
53     silc_free(queue);
54     return NULL;
55   }
56
57   queue->queue = silc_dlist_init();
58   if (!queue->queue) {
59     silc_cond_free(queue->cond);
60     silc_mutex_free(queue->lock);
61     silc_free(queue);
62     return NULL;
63   }
64
65   silc_atomic_init32(&queue->connected, 1);
66
67   return queue;
68 }
69
70 /* Connect current thread to queue */
71
72 void silc_thread_queue_connect(SilcThreadQueue queue)
73 {
74   silc_atomic_add_int32(&queue->connected, 1);
75 }
76
77 /* Disconnect current thread from queue */
78
79 void silc_thread_queue_disconnect(SilcThreadQueue queue)
80 {
81   if (silc_atomic_sub_int32(&queue->connected, 1) > 0)
82     return;
83
84   /* Free queue */
85   SILC_LOG_DEBUG(("Free thread queue %p", queue));
86   silc_cond_free(queue->cond);
87   silc_mutex_free(queue->lock);
88   silc_dlist_uninit(queue->queue);
89   silc_atomic_uninit32(&queue->connected);
90   silc_free(queue);
91 }
92
93 /* Push data to queue */
94
95 void silc_thread_queue_push(SilcThreadQueue queue, void *data)
96 {
97   if (silc_unlikely(!data))
98     return;
99
100   SILC_LOG_DEBUG(("Push data %p to thread queue %p", data, queue));
101
102   silc_mutex_lock(queue->lock);
103   silc_dlist_start(queue->queue);
104   silc_dlist_insert(queue->queue, data);
105   silc_cond_broadcast(queue->cond);
106   silc_mutex_unlock(queue->lock);
107 }
108
109 /* Get data or wait if wanted or return NULL. */
110
111 void *silc_thread_queue_pop(SilcThreadQueue queue, SilcBool block)
112 {
113   void *data;
114
115   if (block)
116     return silc_thread_queue_timed_pop(queue, 0);
117
118   silc_mutex_lock(queue->lock);
119
120   silc_dlist_start(queue->queue);
121   data = silc_dlist_get(queue->queue);
122   if (data)
123     silc_dlist_del(queue->queue, data);
124
125   SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
126
127   silc_mutex_unlock(queue->lock);
128
129   return data;
130 }
131
132 /* Get data or wait for a while */
133
134 void *silc_thread_queue_timed_pop(SilcThreadQueue queue,
135                                   int timeout_msec)
136 {
137   void *data;
138
139   silc_mutex_lock(queue->lock);
140
141   silc_dlist_start(queue->queue);
142   while ((data = silc_dlist_get(queue->queue)) == SILC_LIST_END) {
143     if (!silc_cond_timedwait(queue->cond, queue->lock, timeout_msec))
144       break;
145     silc_dlist_start(queue->queue);
146   }
147
148   if (data)
149     silc_dlist_del(queue->queue, data);
150
151   SILC_LOG_DEBUG(("Pop data %p from thread queue %p", data, queue));
152
153   silc_mutex_unlock(queue->lock);
154
155   return data;
156 }
157
158 /* Pop entire queue */
159
160 SilcDList silc_thread_queue_pop_list(SilcThreadQueue queue, SilcBool block)
161 {
162   SilcDList list;
163
164   silc_mutex_lock(queue->lock);
165
166   if (block)
167     while (silc_dlist_count(queue->queue) == 0)
168       silc_cond_wait(queue->cond, queue->lock);
169
170   list = queue->queue;
171   queue->queue = silc_dlist_init();
172
173   silc_mutex_unlock(queue->lock);
174
175   silc_dlist_start(list);
176
177   return list;
178 }