From: Pekka Riikonen Date: Wed, 14 Feb 2007 14:47:58 +0000 (+0000) Subject: Fixed packet wrapper stream API to support encoder/decoder X-Git-Tag: silc.client.1.1.beta1~18 X-Git-Url: http://git.silcnet.org/gitweb/?p=silc.git;a=commitdiff_plain;h=f0c131441402b9416aa0919996090750353919c2 Fixed packet wrapper stream API to support encoder/decoder for packets, and to handle partial reading correctly. Fixed also inbuf size checking in reading to have enough space before reading. --- diff --git a/lib/silccore/silcpacket.c b/lib/silccore/silcpacket.c index f6b4c87f..df8f0bf4 100644 --- a/lib/silccore/silcpacket.c +++ b/lib/silccore/silcpacket.c @@ -299,6 +299,11 @@ static inline SilcBool silc_packet_stream_read(SilcPacketStream ps, stream = ps->stream; inbuf = &ps->sc->inbuf; + /* Make sure there is enough room to read */ + if (SILC_PACKET_DEFAULT_SIZE * 2 > silc_buffer_taillen(inbuf)) + silc_buffer_realloc(inbuf, silc_buffer_truelen(inbuf) + + (SILC_PACKET_DEFAULT_SIZE * 2)); + if (silc_socket_stream_is_udp(stream, &connected)) { if (!connected) { /* Connectionless UDP stream, read one UDP packet */ @@ -404,7 +409,7 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status, case SILC_STREAM_CAN_READ: /* Reading is locked also with stream->lock because we may be reading at the same time other thread is writing to same underlaying stream. */ - SILC_LOG_DEBUG(("Reading data from stream")); + SILC_LOG_DEBUG(("Reading data from stream %p, ps %p", ps->stream, ps)); /* Read data from stream */ if (!silc_packet_stream_read(ps, &remote)) @@ -423,7 +428,8 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status, break; case SILC_STREAM_CAN_WRITE: - SILC_LOG_DEBUG(("Writing pending data to stream")); + SILC_LOG_DEBUG(("Writing pending data to stream %p, ps %p", + ps->stream, ps)); if (silc_unlikely(!silc_buffer_headlen(&ps->outbuf))) { silc_mutex_unlock(ps->lock); @@ -673,7 +679,7 @@ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine, silc_mutex_unlock(engine->lock); /* Set IO notifier callback. This schedules this stream for I/O. */ - if (!silc_stream_set_notifier(ps->stream, schedule, + if (!silc_stream_set_notifier(ps->stream, schedule, silc_packet_stream_io, ps)) { SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream")); silc_packet_stream_destroy(ps); @@ -2282,7 +2288,10 @@ typedef struct { const SilcStreamOps *ops; SilcPacketStream stream; SilcMutex lock; - void *waiter; /* Waiter context in blocking mode */ + void *waiter; /* Waiter context in blocking mode */ + SilcPacketWrapCoder coder; + void *coder_context; + SilcBuffer encbuf; SilcStreamNotifier callback; void *context; SilcList in_queue; @@ -2290,6 +2299,7 @@ typedef struct { SilcPacketFlags flags; unsigned int closed : 1; unsigned int blocking : 1; + unsigned int read_more : 1; } *SilcPacketWrapperStream; /* Packet wrapper callbacks */ @@ -2309,7 +2319,7 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine, { SilcPacketWrapperStream pws = callback_context; - if (!pws->closed || !pws->callback) + if (pws->closed || !pws->callback) return FALSE; silc_mutex_lock(pws->lock); @@ -2322,6 +2332,19 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine, return TRUE; } +/* Task callback to notify more data is available for reading */ + +SILC_TASK_CALLBACK(silc_packet_wrap_read_more) +{ + SilcPacketWrapperStream pws = context; + + if (pws->closed || !pws->callback) + return; + + /* Call notifier callback */ + pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context); +} + /* Read SILC packet */ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf, @@ -2329,6 +2352,7 @@ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf, { SilcPacketWrapperStream pws = stream; SilcPacket packet; + SilcBool read_more = FALSE; int len; if (pws->closed) @@ -2354,12 +2378,31 @@ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf, silc_mutex_unlock(pws->lock); } + /* Call decoder if set */ + if (pws->coder && !pws->read_more) + pws->coder(stream, SILC_STREAM_CAN_READ, &packet->buffer, + pws->coder_context); + len = silc_buffer_len(&packet->buffer); - if (len > buf_len) + if (len > buf_len) { len = buf_len; + read_more = TRUE; + } + /* Read data */ memcpy(buf, packet->buffer.data, len); + if (read_more && !pws->blocking) { + /* More data will be available (in blocking mode not supported). */ + silc_buffer_pull(&packet->buffer, len); + silc_list_insert(pws->in_queue, NULL, packet); + silc_schedule_task_add_timeout(pws->stream->sc->schedule, + silc_packet_wrap_read_more, pws, 0, 0); + pws->read_more = TRUE; + return len; + } + + pws->read_more = FALSE; silc_packet_free(packet); return len; } @@ -2370,10 +2413,27 @@ int silc_packet_wrap_write(SilcStream stream, const unsigned char *data, SilcUInt32 data_len) { SilcPacketWrapperStream pws = stream; + SilcBool ret = FALSE; + + /* Call decoder if set */ + if (pws->coder) { + silc_buffer_reset(pws->encbuf); + ret = pws->coder(stream, SILC_STREAM_CAN_WRITE, pws->encbuf, + pws->coder_context); + } /* Send the SILC packet */ - if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len)) - return -2; + if (ret) { + if (!silc_packet_send_va(pws->stream, pws->type, pws->flags, + SILC_STR_DATA(silc_buffer_data(pws->encbuf), + silc_buffer_len(pws->encbuf)), + SILC_STR_DATA(data, data_len), + SILC_STR_END)) + return -2; + } else { + if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len)) + return -2; + } return data_len; } @@ -2416,6 +2476,8 @@ void silc_packet_wrap_destroy(SilcStream stream) silc_packet_free(packet); if (pws->lock) silc_mutex_free(pws->lock); + if (pws->encbuf) + silc_buffer_free(pws->encbuf); silc_packet_stream_unref(pws->stream); silc_free(pws); @@ -2458,7 +2520,9 @@ SilcSchedule silc_packet_wrap_get_schedule(SilcStream stream) SilcStream silc_packet_stream_wrap(SilcPacketStream stream, SilcPacketType type, SilcPacketFlags flags, - SilcBool blocking_mode) + SilcBool blocking_mode, + SilcPacketWrapCoder coder, + void *context) { SilcPacketWrapperStream pws; @@ -2473,6 +2537,12 @@ SilcStream silc_packet_stream_wrap(SilcPacketStream stream, pws->type = type; pws->flags = flags; pws->blocking = blocking_mode; + pws->coder = coder; + pws->coder_context = context; + + /* Allocate small amount for encoder buffer. */ + if (pws->coder) + pws->encbuf = silc_buffer_alloc(8); if (pws->blocking) { /* Blocking mode. Use packet waiter to do the thing. */ diff --git a/lib/silccore/silcpacket.h b/lib/silccore/silcpacket.h index 7ddabff1..cef13ca2 100644 --- a/lib/silccore/silcpacket.h +++ b/lib/silccore/silcpacket.h @@ -114,9 +114,10 @@ typedef SilcUInt8 SilcPacketFlags; #define SILC_PACKET_FLAG_LIST 0x02 /* Packet is a list */ #define SILC_PACKET_FLAG_BROADCAST 0x04 /* Packet is a broadcast */ #define SILC_PACKET_FLAG_COMPRESSED 0x08 /* Payload is compressed */ +#define SILC_PACKET_FLAG_ACK 0x10 /* Acknowledge packet */ /* Impelemntation specific flags */ -#define SILC_PACKET_FLAG_LONG_PAD 0x10 /* Use maximum padding */ +#define SILC_PACKET_FLAG_LONG_PAD 0x12 /* Use maximum padding */ /***/ /****s* silccore/SilcPacketAPI/SilcPacketEngine @@ -612,6 +613,36 @@ void silc_packet_stream_unlink(SilcPacketStream stream, SilcPacketCallbacks *callbacks, void *callback_context); +/****f* silccore/SilcPacketAPI/SilcPacketWrapCoder + * + * SYNOPSIS + * + * typedef SilcBool (*SilcPacketWrapCoder)(SilcStream stream, + * SilcStreamStatus status, + * SilcBuffer buffer, + * void *context); + * + * DESCRIPTION + * + * The encoder/decoder callback for silc_packet_stream_wrap. If the + * `status' is SILC_STREAM_CAN_WRITE then additional data can be added + * to `buffer'. It is added before the data that is written with + * silc_stream_write. The silc_buffer_enlarge should be called to verify + * there is enough room in `buffer' before adding data to it. The `buffer' + * must not be freed. + * + * If the `status' is SILC_STREAM_CAN_READ then data from the `buffer' + * may be read before it is passed to readed when silc_stream_read is + * called. The `buffer' may be advanced also to hide data in it. + * + * This function returns FALSE in case of error. + * + ***/ +typedef SilcBool (*SilcPacketWrapCoder)(SilcStream stream, + SilcStreamStatus status, + SilcBuffer buffer, + void *context); + /****f* silccore/SilcPacketAPI/silc_packet_stream_wrap * * SYNOPSIS @@ -619,7 +650,9 @@ void silc_packet_stream_unlink(SilcPacketStream stream, * SilcStream silc_packet_stream_wrap(SilcPacketStream stream, * SilcPacketType type, * SilcPacketFlags flags, - * SilcBool blocking_mode); + * SilcBool blocking_mode, + * SilcPacketWrapCoder coder, + * void *context); * * DESCRIPTION * @@ -644,6 +677,12 @@ void silc_packet_stream_unlink(SilcPacketStream stream, * once returns one complete SILC packet data payload (which is of type of * `type'). * + * The `coder' is optional encoder/decoder callback which the packet engine + * will call if it is non-NULL. It can be used to encode additional data + * into each packet when silc_stream_write is called or decode data before + * it is passed to reader when silc_stream_read is called. The `context' + * is passed to `coder'. + * * The returned SilcStream can be used as any normal stream and all * SilcStream API functions may be used with the stream. This returns * NULL on error. @@ -652,7 +691,9 @@ void silc_packet_stream_unlink(SilcPacketStream stream, SilcStream silc_packet_stream_wrap(SilcPacketStream stream, SilcPacketType type, SilcPacketFlags flags, - SilcBool blocking_mode); + SilcBool blocking_mode, + SilcPacketWrapCoder coder, + void *context); /****f* silccore/SilcPacketAPI/silc_packet_get_sender *