Added SILC Buffer Stream API
[runtime.git] / lib / silcutil / silcbufferstream.c
1 /*
2
3   silcbufferstream.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 2008 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include <silcruntime.h>
21
22 /************************** Types and definitions ***************************/
23
24 #define SILC_IS_BUFFER_STREAM(s) (s && s->ops == &silc_buffer_stream_ops)
25
26 const SilcStreamOps silc_buffer_stream_ops;
27
28 /* Buffer stream context */
29 typedef struct {
30   const SilcStreamOps *ops;
31   SilcStream stream;
32   SilcBuffer outbuf;
33   SilcBuffer inbuf;
34   SilcBufferStruct queue;
35   SilcBufferReceiveCallback receiver;
36   void *context;
37   unsigned int closed   : 1;
38 } *SilcBufferStream;
39
40 /************************ Static utility functions **************************/
41
42 /* IO callback */
43
44 static void silc_buffer_stream_io(SilcStream stream,
45                                   SilcStreamStatus status,
46                                   void *context)
47 {
48   SilcBufferStream bs = context;
49   SilcBuffer buffer = NULL;
50   SilcUInt32 buf_len;
51   int ret, len;
52
53   if (bs->closed)
54     return;
55
56   if (status == SILC_STREAM_CAN_READ) {
57     /* Read data */
58     SILC_LOG_DEBUG(("Read data from buffer stream %p", bs));
59
60     while ((ret = silc_stream_read(bs->stream, bs->inbuf->tail,
61                                    silc_buffer_taillen(bs->inbuf))) > 0) {
62       if (!buffer) {
63         buffer = silc_buffer_alloc(0);
64         if (!buffer)
65           return;
66       }
67
68       silc_buffer_pull_tail(bs->inbuf, ret);
69
70       /* Parse the buffer */
71       while ((len = silc_buffer_unformat(bs->inbuf,
72                                          SILC_STR_BUFFER_ALLOC(buffer),
73                                          SILC_STR_END)) > 0) {
74         /* Deliver the buffer */
75         SILC_LOG_HEXDUMP(("Received buffer, size %d",
76                           silc_buffer_len(buffer)),
77                          silc_buffer_data(buffer), silc_buffer_len(buffer));
78         bs->receiver(SILC_OK, (SilcStream)bs, buffer, bs->context);
79         silc_buffer_pull(bs->inbuf, len);
80
81         buffer = silc_buffer_alloc(0);
82         if (!buffer)
83           return;
84       }
85
86       if (silc_buffer_len(bs->inbuf) > 0) {
87         /* Not complete buffer, read more data */
88         buf_len = 4;
89         if (silc_buffer_len(bs->inbuf) >= 4) {
90           SILC_GET32_MSB(buf_len, bs->inbuf->data);
91           SILC_LOG_DEBUG(("Incomplete buffer, wait for rest, buffer size %d",
92                           buf_len));
93         }
94
95         /* Enlarge inbuf if needed */
96         if (silc_buffer_taillen(bs->inbuf) < buf_len)
97           silc_buffer_realloc(bs->inbuf, silc_buffer_truelen(bs->inbuf) +
98                               buf_len);
99         continue;
100       }
101
102       /* All data read, read more */
103       silc_buffer_reset(bs->inbuf);
104     }
105
106     silc_buffer_free(buffer);
107
108     if (ret == 0 || ret == -2) {
109       bs->receiver(silc_errno, (SilcStream)bs, NULL, bs->context);
110       return;
111     }
112   } else {
113     /* Write any pending data */
114     SILC_LOG_DEBUG(("Write pending data to buffer stream %p", bs));
115
116     while (silc_buffer_len(&bs->queue) > 0) {
117       ret = silc_stream_write(bs->stream, silc_buffer_data(&bs->queue),
118                               silc_buffer_len(&bs->queue));
119       if (silc_unlikely(ret == 0))
120         return;
121
122       if (silc_unlikely(ret == -2))
123         return;
124
125       if (silc_unlikely(ret == -1)) {
126         SILC_LOG_DEBUG(("Buffer stream %p would block, send later", bs));
127         return;
128       }
129
130       /* Wrote data */
131       silc_buffer_pull(&bs->queue, ret);
132     }
133
134     memset(&bs->queue, 0, sizeof(bs->queue));
135     silc_buffer_reset(bs->outbuf);
136   }
137 }
138
139 /****************************** Public API **********************************/
140
141 /* Create buffer stream */
142
143 SilcStream silc_buffer_stream_create(SilcStream stream,
144                                      SilcBufferReceiveCallback receiver,
145                                      void *context)
146 {
147   SilcBufferStream bs;
148
149   if (!stream || !receiver) {
150     silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
151     return NULL;
152   }
153
154   bs = silc_calloc(1, sizeof(*bs));
155   if (!bs)
156     return NULL;
157
158   SILC_LOG_DEBUG(("Created new buffer stream %p", bs));
159
160   bs->ops = &silc_buffer_stream_ops;
161   bs->stream = stream;
162   bs->receiver = receiver;
163   bs->context = context;
164   bs->inbuf = silc_buffer_alloc(32);
165   bs->outbuf = silc_buffer_alloc(0);
166   if (!bs->inbuf || !bs->outbuf) {
167     silc_buffer_free(bs->inbuf);
168     silc_buffer_free(bs->outbuf);
169     silc_free(bs);
170     return NULL;
171   }
172
173   /* Set IO callback to the underlaying stream */
174   silc_stream_set_notifier(bs->stream,
175                            silc_stream_get_schedule(bs->stream),
176                            silc_buffer_stream_io, bs);
177
178   return (SilcStream)bs;
179 }
180
181 /* Send buffer to stream */
182
183 SilcBool silc_buffer_stream_send(SilcStream stream,
184                                  SilcBuffer buffer)
185 {
186   SilcBufferStream bs = stream;
187   int ret;
188
189   SILC_LOG_HEXDUMP(("Send to buffer stream %p %d bytes", bs,
190                     silc_buffer_len(buffer)),
191                    silc_buffer_data(buffer), silc_buffer_len(buffer));
192
193   if (silc_unlikely(!SILC_IS_BUFFER_STREAM(bs))) {
194     silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
195     return FALSE;
196   }
197
198   if (silc_unlikely(!buffer)) {
199     silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
200     return FALSE;
201   }
202
203   if (silc_unlikely(bs->closed)) {
204     SILC_LOG_DEBUG(("Buffer stream %p is closed", bs));
205     silc_set_errno(SILC_ERR_NOT_VALID);
206     return FALSE;
207   }
208
209   /* Put to queue */
210   if (silc_buffer_format(bs->outbuf,
211                          SILC_STR_ADVANCE,
212                          SILC_STR_BUFFER(buffer),
213                          SILC_STR_END) < 0)
214     return FALSE;
215
216   ret = silc_buffer_headlen(&bs->queue);
217   bs->queue.head = bs->outbuf->head;
218   bs->queue.data = bs->queue.head + ret;
219   bs->queue.tail = bs->outbuf->data;
220   bs->queue.end  = bs->outbuf->end;
221
222   /* Write the queue buffer */
223   while (silc_buffer_len(&bs->queue) > 0) {
224     ret = silc_stream_write(bs->stream, silc_buffer_data(&bs->queue),
225                             silc_buffer_len(&bs->queue));
226     if (silc_unlikely(ret == 0))
227       return FALSE;
228
229     if (silc_unlikely(ret == -2))
230       return FALSE;
231
232     if (silc_unlikely(ret == -1)) {
233       SILC_LOG_DEBUG(("Buffer stream %p would block, send later", bs));
234       return TRUE;
235     }
236
237     /* Wrote data */
238     silc_buffer_pull(&bs->queue, ret);
239   }
240
241   memset(&bs->queue, 0, sizeof(bs->queue));
242   silc_buffer_reset(bs->outbuf);
243
244   SILC_LOG_DEBUG(("Buffer sent to buffer stream %p", bs));
245
246   return TRUE;
247 }
248
249 /******************************* Stream API *********************************/
250
251 int silc_buffer_stream_read(SilcStream stream, unsigned char *buf,
252                             SilcUInt32 buf_len)
253 {
254   SILC_LOG_ERROR(("The silc_stream_read cannot be used with buffer streams"));
255   return -2;
256 }
257
258 int silc_buffer_stream_write(SilcStream stream, const unsigned char *data,
259                              SilcUInt32 data_len)
260 {
261   SILC_LOG_ERROR(("Use silc_buffer_stream_send with buffer streams"));
262   return -2;
263 }
264
265 SilcBool silc_buffer_stream_close(SilcStream stream)
266 {
267   SilcBufferStream bs = stream;
268
269   SILC_LOG_DEBUG(("Closing buffer stream %p", bs));
270
271   bs->closed = TRUE;
272   silc_stream_set_notifier(bs->stream,
273                            silc_stream_get_schedule(bs->stream), NULL, NULL);
274
275   return TRUE;
276 }
277
278 void silc_buffer_stream_destroy(SilcStream stream)
279 {
280   SilcBufferStream bs = stream;
281
282   SILC_LOG_DEBUG(("Destroying buffer stream %p", bs));
283
284   silc_buffer_stream_close(stream);
285   silc_buffer_free(bs->outbuf);
286   silc_buffer_free(bs->inbuf);
287   silc_free(bs);
288 }
289
290 SilcBool silc_buffer_stream_notifier(SilcStream stream,
291                                      SilcSchedule schedule,
292                                      SilcStreamNotifier callback,
293                                      void *context)
294 {
295   SILC_LOG_ERROR(("The silc_stream_set_notifier cannot be used with "
296                   "buffer streams"));
297   return FALSE;
298 }
299
300 SilcSchedule silc_buffer_stream_get_schedule(SilcStream stream)
301 {
302   SilcBufferStream bs = stream;
303   return silc_stream_get_schedule(bs->stream);
304 }
305
306 /* Buffer stream operations */
307 const SilcStreamOps silc_buffer_stream_ops =
308 {
309   silc_buffer_stream_read,
310   silc_buffer_stream_write,
311   silc_buffer_stream_close,
312   silc_buffer_stream_destroy,
313   silc_buffer_stream_notifier,
314   silc_buffer_stream_get_schedule
315 };