Author: Pekka Riikonen <priikone@silcnet.org>
- Copyright (C) 1997 - 2006 Pekka Riikonen
+ Copyright (C) 1997 - 2007 Pekka Riikonen
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
struct SilcPacketStreamStruct *next;
SilcPacketEngineContext sc; /* Per scheduler context */
SilcStream stream; /* Underlaying stream */
- SilcMutex lock; /* Stream lock */
+ SilcMutex lock; /* Packet stream lock */
SilcDList process; /* Packet processors, or NULL */
SilcPacketRemoteUDP remote_udp; /* UDP remote stream tuple, or NULL */
void *stream_context; /* Stream context */
stream = ps->stream;
inbuf = &ps->sc->inbuf;
+ /* Make sure there is enough room to read */
+ if (SILC_PACKET_DEFAULT_SIZE * 2 > silc_buffer_taillen(inbuf))
+ silc_buffer_realloc(inbuf, silc_buffer_truelen(inbuf) +
+ (SILC_PACKET_DEFAULT_SIZE * 2));
+
if (silc_socket_stream_is_udp(stream, &connected)) {
if (!connected) {
/* Connectionless UDP stream, read one UDP packet */
switch (status) {
case SILC_STREAM_CAN_READ:
- SILC_LOG_DEBUG(("Reading data from stream"));
+ /* Reading is locked also with stream->lock because we may be reading
+ at the same time other thread is writing to same underlaying stream. */
+ SILC_LOG_DEBUG(("Reading data from stream %p, ps %p", ps->stream, ps));
/* Read data from stream */
if (!silc_packet_stream_read(ps, &remote))
break;
case SILC_STREAM_CAN_WRITE:
- SILC_LOG_DEBUG(("Writing pending data to stream"));
+ SILC_LOG_DEBUG(("Writing pending data to stream %p, ps %p",
+ ps->stream, ps));
if (silc_unlikely(!silc_buffer_headlen(&ps->outbuf))) {
silc_mutex_unlock(ps->lock);
silc_hash_string_compare, NULL,
silc_packet_engine_hash_destr,
NULL, TRUE);
+
silc_mutex_unlock(engine->lock);
/* Set IO notifier callback. This schedules this stream for I/O. */
- silc_stream_set_notifier(ps->stream, schedule, silc_packet_stream_io, ps);
+ if (!silc_stream_set_notifier(ps->stream, schedule,
+ silc_packet_stream_io, ps)) {
+ SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream"));
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
return ps;
}
if (!stream)
return;
- if (silc_atomic_get_int8(&stream->refcnt) > 1) {
+ if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) {
stream->destroyed = TRUE;
+
+ /* Close the underlaying stream */
+ if (!stream->udp && stream->stream)
+ silc_stream_close(stream->stream);
return;
}
void silc_packet_stream_ref(SilcPacketStream stream)
{
silc_atomic_add_int8(&stream->refcnt, 1);
+ SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream,
+ silc_atomic_get_int8(&stream->refcnt) - 1,
+ silc_atomic_get_int8(&stream->refcnt)));
}
/* Unreference packet stream */
void silc_packet_stream_unref(SilcPacketStream stream)
{
- if (silc_atomic_sub_int8(&stream->refcnt, 1) == 0)
- silc_packet_stream_destroy(stream);
+ SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream,
+ silc_atomic_get_int8(&stream->refcnt),
+ silc_atomic_get_int8(&stream->refcnt) - 1));
+ if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0)
+ return;
+ silc_atomic_add_int8(&stream->refcnt, 1);
+ silc_packet_stream_destroy(stream);
}
/* Return engine */
typedef struct {
const SilcStreamOps *ops;
SilcPacketStream stream;
+ SilcMutex lock;
+ void *waiter; /* Waiter context in blocking mode */
+ SilcPacketWrapCoder coder;
+ void *coder_context;
+ SilcBuffer encbuf;
SilcStreamNotifier callback;
void *context;
SilcList in_queue;
SilcPacketType type;
SilcPacketFlags flags;
unsigned int closed : 1;
+ unsigned int blocking : 1;
+ unsigned int read_more : 1;
} *SilcPacketWrapperStream;
/* Packet wrapper callbacks */
silc_packet_wrap_packet_receive, NULL, NULL
};
-/* Packet stream wrapper receive callback */
+/* Packet stream wrapper receive callback, non-blocking mode */
static SilcBool
silc_packet_wrap_packet_receive(SilcPacketEngine engine,
{
SilcPacketWrapperStream pws = callback_context;
- if (!pws->closed || !pws->callback)
+ if (pws->closed || !pws->callback)
return FALSE;
+ silc_mutex_lock(pws->lock);
silc_list_add(pws->in_queue, packet);
+ silc_mutex_unlock(pws->lock);
/* Call notifier callback */
pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context);
return TRUE;
}
+/* Task callback to notify more data is available for reading */
+
+SILC_TASK_CALLBACK(silc_packet_wrap_read_more)
+{
+ SilcPacketWrapperStream pws = context;
+
+ if (pws->closed || !pws->callback)
+ return;
+
+ /* Call notifier callback */
+ pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context);
+}
+
/* Read SILC packet */
int silc_packet_wrap_read(SilcStream stream, unsigned char *buf,
{
SilcPacketWrapperStream pws = stream;
SilcPacket packet;
+ SilcBool read_more = FALSE;
int len;
if (pws->closed)
return -2;
- if (!silc_list_count(pws->in_queue))
- return -1;
- silc_list_start(pws->in_queue);
- packet = silc_list_get(pws->in_queue);
- silc_list_del(pws->in_queue, packet);
+ if (pws->blocking) {
+ /* Block until packet is received */
+ if ((silc_packet_wait(pws->waiter, 0, &packet)) < 0)
+ return -2;
+ if (pws->closed)
+ return -2;
+ } else {
+ /* Non-blocking mode */
+ silc_mutex_lock(pws->lock);
+ if (!silc_list_count(pws->in_queue)) {
+ silc_mutex_unlock(pws->lock);
+ return -1;
+ }
+
+ silc_list_start(pws->in_queue);
+ packet = silc_list_get(pws->in_queue);
+ silc_list_del(pws->in_queue, packet);
+ silc_mutex_unlock(pws->lock);
+ }
+
+ /* Call decoder if set */
+ if (pws->coder && !pws->read_more)
+ pws->coder(stream, SILC_STREAM_CAN_READ, &packet->buffer,
+ pws->coder_context);
len = silc_buffer_len(&packet->buffer);
- if (len > buf_len)
+ if (len > buf_len) {
len = buf_len;
+ read_more = TRUE;
+ }
+ /* Read data */
memcpy(buf, packet->buffer.data, len);
+ if (read_more && !pws->blocking) {
+ /* More data will be available (in blocking mode not supported). */
+ silc_buffer_pull(&packet->buffer, len);
+ silc_list_insert(pws->in_queue, NULL, packet);
+ silc_schedule_task_add_timeout(pws->stream->sc->schedule,
+ silc_packet_wrap_read_more, pws, 0, 0);
+ pws->read_more = TRUE;
+ return len;
+ }
+
+ pws->read_more = FALSE;
silc_packet_free(packet);
return len;
}
SilcUInt32 data_len)
{
SilcPacketWrapperStream pws = stream;
+ SilcBool ret = FALSE;
+
+ /* Call decoder if set */
+ if (pws->coder) {
+ silc_buffer_reset(pws->encbuf);
+ ret = pws->coder(stream, SILC_STREAM_CAN_WRITE, pws->encbuf,
+ pws->coder_context);
+ }
/* Send the SILC packet */
- if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len))
- return -2;
+ if (ret) {
+ if (!silc_packet_send_va(pws->stream, pws->type, pws->flags,
+ SILC_STR_DATA(silc_buffer_data(pws->encbuf),
+ silc_buffer_len(pws->encbuf)),
+ SILC_STR_DATA(data, data_len),
+ SILC_STR_END))
+ return -2;
+ } else {
+ if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len))
+ return -2;
+ }
return data_len;
}
if (pws->closed)
return TRUE;
- /* Unlink */
- if (pws->callback)
- silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws);
+ if (pws->blocking) {
+ /* Close packet waiter */
+ silc_packet_wait_uninit(pws->waiter, pws->stream);
+ } else {
+ /* Unlink */
+ if (pws->callback)
+ silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws);
+ }
pws->closed = TRUE;
return TRUE;
silc_list_start(pws->in_queue);
while ((packet = silc_list_get(pws->in_queue)))
silc_packet_free(packet);
+ if (pws->lock)
+ silc_mutex_free(pws->lock);
+ if (pws->encbuf)
+ silc_buffer_free(pws->encbuf);
silc_packet_stream_unref(pws->stream);
silc_free(pws);
/* Link stream to receive packets */
-void silc_packet_wrap_notifier(SilcStream stream,
- SilcSchedule schedule,
- SilcStreamNotifier callback,
- void *context)
+SilcBool silc_packet_wrap_notifier(SilcStream stream,
+ SilcSchedule schedule,
+ SilcStreamNotifier callback,
+ void *context)
{
SilcPacketWrapperStream pws = stream;
- if (pws->closed)
- return;
+ if (pws->closed || pws->blocking)
+ return FALSE;
/* Link to receive packets */
if (callback)
pws->callback = callback;
pws->context = context;
+
+ return TRUE;
}
/* Return schedule */
SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
SilcPacketType type,
- SilcPacketFlags flags)
+ SilcPacketFlags flags,
+ SilcBool blocking_mode,
+ SilcPacketWrapCoder coder,
+ void *context)
{
SilcPacketWrapperStream pws;
pws->stream = stream;
pws->type = type;
pws->flags = flags;
+ pws->blocking = blocking_mode;
+ pws->coder = coder;
+ pws->coder_context = context;
+
+ /* Allocate small amount for encoder buffer. */
+ if (pws->coder)
+ pws->encbuf = silc_buffer_alloc(8);
+
+ if (pws->blocking) {
+ /* Blocking mode. Use packet waiter to do the thing. */
+ pws->waiter = silc_packet_wait_init(pws->stream, pws->type, -1);
+ if (!pws->waiter) {
+ silc_free(pws);
+ return NULL;
+ }
+ } else {
+ /* Non-blocking mode */
+ if (!silc_mutex_alloc(&pws->lock)) {
+ silc_free(pws);
+ return NULL;
+ }
+
+ silc_list_init(pws->in_queue, struct SilcPacketStruct, next);
+ }
- silc_list_init(pws->in_queue, struct SilcPacketStruct, next);
silc_packet_stream_ref(stream);
return (SilcStream)pws;