5 Author: Pekka Riikonen <priikone@silcnet.org>
7 Copyright (C) 2008 Pekka Riikonen
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; version 2 of the License.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
20 #include <silcruntime.h>
22 /************************** Types and definitions ***************************/
24 #define SILC_IS_BUFFER_STREAM(s) (s && s->ops == &silc_buffer_stream_ops)
26 const SilcStreamOps silc_buffer_stream_ops;
28 /* Buffer stream context */
30 const SilcStreamOps *ops;
34 SilcBufferStruct queue;
35 SilcBufferReceiveCallback receiver;
37 unsigned int closed : 1;
40 /************************ Static utility functions **************************/
44 static void silc_buffer_stream_io(SilcStream stream,
45 SilcStreamStatus status,
48 SilcBufferStream bs = context;
49 SilcBuffer buffer = NULL;
56 if (status == SILC_STREAM_CAN_READ) {
58 SILC_LOG_DEBUG(("Read data from buffer stream %p", bs));
60 while ((ret = silc_stream_read(bs->stream, bs->inbuf->tail,
61 silc_buffer_taillen(bs->inbuf))) > 0) {
63 buffer = silc_buffer_alloc(0);
68 silc_buffer_pull_tail(bs->inbuf, ret);
70 /* Parse the buffer */
71 while ((len = silc_buffer_unformat(bs->inbuf,
72 SILC_STR_BUFFER_ALLOC(buffer),
74 /* Deliver the buffer */
75 SILC_LOG_HEXDUMP(("Received buffer, size %d",
76 silc_buffer_len(buffer)),
77 silc_buffer_data(buffer), silc_buffer_len(buffer));
78 bs->receiver(SILC_OK, (SilcStream)bs, buffer, bs->context);
79 silc_buffer_pull(bs->inbuf, len);
81 buffer = silc_buffer_alloc(0);
86 if (silc_buffer_len(bs->inbuf) > 0) {
87 /* Not complete buffer, read more data */
89 if (silc_buffer_len(bs->inbuf) >= 4) {
90 SILC_GET32_MSB(buf_len, bs->inbuf->data);
91 SILC_LOG_DEBUG(("Incomplete buffer, wait for rest, buffer size %d",
95 /* Enlarge inbuf if needed */
96 if (silc_buffer_taillen(bs->inbuf) < buf_len)
97 silc_buffer_realloc(bs->inbuf, silc_buffer_truelen(bs->inbuf) +
102 /* All data read, read more */
103 silc_buffer_reset(bs->inbuf);
106 silc_buffer_free(buffer);
108 if (ret == 0 || ret == -2) {
109 bs->receiver(silc_errno, (SilcStream)bs, NULL, bs->context);
113 /* Write any pending data */
114 SILC_LOG_DEBUG(("Write pending data to buffer stream %p", bs));
116 while (silc_buffer_len(&bs->queue) > 0) {
117 ret = silc_stream_write(bs->stream, silc_buffer_data(&bs->queue),
118 silc_buffer_len(&bs->queue));
119 if (silc_unlikely(ret == 0))
122 if (silc_unlikely(ret == -2))
125 if (silc_unlikely(ret == -1)) {
126 SILC_LOG_DEBUG(("Buffer stream %p would block, send later", bs));
131 silc_buffer_pull(&bs->queue, ret);
134 memset(&bs->queue, 0, sizeof(bs->queue));
135 silc_buffer_reset(bs->outbuf);
139 /****************************** Public API **********************************/
141 /* Create buffer stream */
143 SilcStream silc_buffer_stream_create(SilcStream stream,
144 SilcBufferReceiveCallback receiver,
149 if (!stream || !receiver) {
150 silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
154 bs = silc_calloc(1, sizeof(*bs));
158 SILC_LOG_DEBUG(("Created new buffer stream %p", bs));
160 bs->ops = &silc_buffer_stream_ops;
162 bs->receiver = receiver;
163 bs->context = context;
164 bs->inbuf = silc_buffer_alloc(32);
165 bs->outbuf = silc_buffer_alloc(0);
166 if (!bs->inbuf || !bs->outbuf) {
167 silc_buffer_free(bs->inbuf);
168 silc_buffer_free(bs->outbuf);
173 /* Set IO callback to the underlaying stream */
174 silc_stream_set_notifier(bs->stream,
175 silc_stream_get_schedule(bs->stream),
176 silc_buffer_stream_io, bs);
178 return (SilcStream)bs;
181 /* Send buffer to stream */
183 SilcBool silc_buffer_stream_send(SilcStream stream,
186 SilcBufferStream bs = stream;
189 SILC_LOG_HEXDUMP(("Send to buffer stream %p %d bytes", bs,
190 silc_buffer_len(buffer)),
191 silc_buffer_data(buffer), silc_buffer_len(buffer));
193 if (silc_unlikely(!SILC_IS_BUFFER_STREAM(bs))) {
194 silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
198 if (silc_unlikely(!buffer)) {
199 silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
203 if (silc_unlikely(bs->closed)) {
204 SILC_LOG_DEBUG(("Buffer stream %p is closed", bs));
205 silc_set_errno(SILC_ERR_NOT_VALID);
210 if (silc_buffer_format(bs->outbuf,
212 SILC_STR_BUFFER(buffer),
216 ret = silc_buffer_headlen(&bs->queue);
217 bs->queue.head = bs->outbuf->head;
218 bs->queue.data = bs->queue.head + ret;
219 bs->queue.tail = bs->outbuf->data;
220 bs->queue.end = bs->outbuf->end;
222 /* Write the queue buffer */
223 while (silc_buffer_len(&bs->queue) > 0) {
224 ret = silc_stream_write(bs->stream, silc_buffer_data(&bs->queue),
225 silc_buffer_len(&bs->queue));
226 if (silc_unlikely(ret == 0))
229 if (silc_unlikely(ret == -2))
232 if (silc_unlikely(ret == -1)) {
233 SILC_LOG_DEBUG(("Buffer stream %p would block, send later", bs));
238 silc_buffer_pull(&bs->queue, ret);
241 memset(&bs->queue, 0, sizeof(bs->queue));
242 silc_buffer_reset(bs->outbuf);
244 SILC_LOG_DEBUG(("Buffer sent to buffer stream %p", bs));
249 /******************************* Stream API *********************************/
251 int silc_buffer_stream_read(SilcStream stream, unsigned char *buf,
254 SILC_LOG_ERROR(("The silc_stream_read cannot be used with buffer streams"));
258 int silc_buffer_stream_write(SilcStream stream, const unsigned char *data,
261 SILC_LOG_ERROR(("Use silc_buffer_stream_send with buffer streams"));
265 SilcBool silc_buffer_stream_close(SilcStream stream)
267 SilcBufferStream bs = stream;
269 SILC_LOG_DEBUG(("Closing buffer stream %p", bs));
272 silc_stream_set_notifier(bs->stream,
273 silc_stream_get_schedule(bs->stream), NULL, NULL);
278 void silc_buffer_stream_destroy(SilcStream stream)
280 SilcBufferStream bs = stream;
282 SILC_LOG_DEBUG(("Destroying buffer stream %p", bs));
284 silc_buffer_stream_close(stream);
285 silc_buffer_free(bs->outbuf);
286 silc_buffer_free(bs->inbuf);
290 SilcBool silc_buffer_stream_notifier(SilcStream stream,
291 SilcSchedule schedule,
292 SilcStreamNotifier callback,
295 SILC_LOG_ERROR(("The silc_stream_set_notifier cannot be used with "
300 SilcSchedule silc_buffer_stream_get_schedule(SilcStream stream)
302 SilcBufferStream bs = stream;
303 return silc_stream_get_schedule(bs->stream);
306 /* Buffer stream operations */
307 const SilcStreamOps silc_buffer_stream_ops =
309 silc_buffer_stream_read,
310 silc_buffer_stream_write,
311 silc_buffer_stream_close,
312 silc_buffer_stream_destroy,
313 silc_buffer_stream_notifier,
314 silc_buffer_stream_get_schedule