Author: Pekka Riikonen <priikone@silcnet.org>
- Copyright (C) 1997 - 2005 Pekka Riikonen
+ Copyright (C) 1997 - 2006 Pekka Riikonen
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
SilcStream stream; /* Underlaying stream */
SilcMutex lock; /* Stream lock */
SilcDList process; /* Packet processors, it set */
- SilcHashTable streamers; /* Valid if streamers exist */
void *stream_context; /* Stream context */
SilcBufferStruct inbuf; /* In buffer */
SilcBufferStruct outbuf; /* Out buffer */
break;
case SILC_STREAM_CAN_READ:
- /* Packet receiving can only happen in one thread, so locking is not
- required in packet receiving procedure. */
- silc_mutex_unlock(ps->lock);
-
SILC_LOG_DEBUG(("Reading data from stream"));
/* Make sure we have fair amount of free space in inbuf */
if (silc_buffer_taillen(&ps->inbuf) < SILC_PACKET_DEFAULT_SIZE)
if (!silc_buffer_realloc(&ps->inbuf, silc_buffer_truelen(&ps->inbuf) +
- SILC_PACKET_DEFAULT_SIZE * 2))
+ SILC_PACKET_DEFAULT_SIZE * 2)) {
+ silc_mutex_unlock(ps->lock);
return;
+ }
/* Read data from stream */
ret = silc_stream_read(ps->stream, ps->inbuf.tail,
if (ret == 0) {
/* EOS */
silc_buffer_reset(&ps->inbuf);
+ silc_mutex_unlock(ps->lock);
SILC_PACKET_CALLBACK_EOS(ps);
return;
}
if (ret == -2) {
/* Error */
silc_buffer_reset(&ps->inbuf);
+ silc_mutex_unlock(ps->lock);
SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
return;
}
if (ret == -1) {
/* Cannot read now, do it later. */
silc_buffer_pull(&ps->inbuf, silc_buffer_len(&ps->inbuf));
+ silc_mutex_unlock(ps->lock);
return;
}
- /* Read some data */
- silc_buffer_pull_tail(&ps->inbuf, ret);
-
/* Now process the data */
+ silc_buffer_pull_tail(&ps->inbuf, ret);
silc_packet_read_process(ps);
+ silc_mutex_unlock(ps->lock);
break;
default:
/* Clear and free buffers */
silc_buffer_clear(&stream->inbuf);
silc_buffer_clear(&stream->outbuf);
- silc_free(silc_buffer_steal(&stream->inbuf, NULL));
- silc_free(silc_buffer_steal(&stream->outbuf, NULL));
-
- silc_dlist_uninit(stream->process);
+ silc_buffer_purge(&stream->inbuf);
+ silc_buffer_purge(&stream->outbuf);
/* XXX */
+ /* Destroy the underlaying stream */
+ silc_stream_destroy(stream->stream);
+
+ silc_dlist_uninit(stream->process);
+ silc_mutex_free(stream->lock);
silc_free(stream);
}
stream->is_router = TRUE;
}
+
/* Links `callbacks' to `stream' for specified packet types */
-SilcBool silc_packet_stream_link(SilcPacketStream stream,
- SilcPacketCallbacks *callbacks,
- void *callback_context,
- int priority, ...)
+static SilcBool silc_packet_stream_link_va(SilcPacketStream stream,
+ SilcPacketCallbacks *callbacks,
+ void *callback_context,
+ int priority, va_list ap)
{
- va_list ap;
SilcPacketProcess p, e;
SilcInt32 packet_type;
int i;
if (!stream->process) {
stream->process = silc_dlist_init();
- if (!stream->process)
+ if (!stream->process) {
+ silc_mutex_unlock(stream->lock);
return FALSE;
+ }
}
/* According to priority set the procesor to correct position. First
if (!e)
silc_dlist_add(stream->process, p);
- silc_mutex_unlock(stream->lock);
-
/* Get packet types to process */
- va_start(ap, priority);
i = 1;
while (1) {
packet_type = va_arg(ap, SilcInt32);
break;
p->types = silc_realloc(p->types, sizeof(*p->types) * (i + 1));
- if (!p->types)
+ if (!p->types) {
+ silc_mutex_unlock(stream->lock);
return FALSE;
+ }
p->types[i - 1] = (SilcPacketType)packet_type;
i++;
}
if (p->types)
p->types[i - 1] = 0;
- va_end(ap);
+
+ silc_mutex_unlock(stream->lock);
silc_packet_stream_ref(stream);
return TRUE;
}
+/* Links `callbacks' to `stream' for specified packet types */
+
+SilcBool silc_packet_stream_link(SilcPacketStream stream,
+ SilcPacketCallbacks *callbacks,
+ void *callback_context,
+ int priority, ...)
+{
+ va_list ap;
+ SilcBool ret;
+
+ va_start(ap, priority);
+ ret = silc_packet_stream_link_va(stream, callbacks, callback_context,
+ priority, ap);
+ va_end(ap);
+
+ return ret;
+}
+
/* Unlinks `callbacks' from `stream'. */
void silc_packet_stream_unlink(SilcPacketStream stream,
void silc_packet_set_context(SilcPacketStream stream, void *stream_context)
{
+ silc_mutex_lock(stream->lock);
stream->stream_context = stream_context;
+ silc_mutex_unlock(stream->lock);
}
/* Return application context from packet stream */
void *silc_packet_get_context(SilcPacketStream stream)
{
- return stream->stream_context;
+ void *context;
+ silc_mutex_lock(stream->lock);
+ context = stream->stream_context;
+ silc_mutex_unlock(stream->lock);
+ return context;
}
/* Return underlaying stream */
SilcCipher receive)
{
SILC_LOG_DEBUG(("Setting new ciphers to packet stream"));
+ silc_mutex_lock(stream->lock);
stream->send_key = send;
stream->receive_key = receive;
+ silc_mutex_unlock(stream->lock);
}
/* Return current ciphers from packet stream */
if (!stream->send_key && !stream->receive_key)
return FALSE;
+ silc_mutex_lock(stream->lock);
+
if (send)
*send = stream->send_key;
if (receive)
*receive = stream->receive_key;
+ silc_mutex_unlock(stream->lock);
+
return TRUE;
}
SilcHmac receive)
{
SILC_LOG_DEBUG(("Setting new HMACs to packet stream"));
+ silc_mutex_lock(stream->lock);
stream->send_hmac = send;
stream->receive_hmac = receive;
+ silc_mutex_unlock(stream->lock);
}
/* Return current HMACs from packet stream */
if (!stream->send_hmac && !stream->receive_hmac)
return FALSE;
+ silc_mutex_lock(stream->lock);
+
if (send)
*send = stream->send_hmac;
if (receive)
*receive = stream->receive_hmac;
+ silc_mutex_unlock(stream->lock);
+
return TRUE;
}
SILC_LOG_DEBUG(("Setting new IDs to packet stream"));
+ silc_mutex_lock(stream->lock);
+
if (src_id) {
silc_free(stream->src_id);
- if (!silc_id_id2str(src_id, src_id_type, tmp, sizeof(tmp), &len))
+ if (!silc_id_id2str(src_id, src_id_type, tmp, sizeof(tmp), &len)) {
+ silc_mutex_unlock(stream->lock);
return FALSE;
+ }
stream->src_id = silc_memdup(tmp, len);
- if (!stream->src_id)
+ if (!stream->src_id) {
+ silc_mutex_unlock(stream->lock);
return FALSE;
+ }
stream->src_id_type = src_id_type;
stream->src_id_len = len;
}
if (dst_id) {
silc_free(stream->dst_id);
- if (!silc_id_id2str(dst_id, dst_id_type, tmp, sizeof(tmp), &len))
+ if (!silc_id_id2str(dst_id, dst_id_type, tmp, sizeof(tmp), &len)) {
+ silc_mutex_unlock(stream->lock);
return FALSE;
+ }
stream->dst_id = silc_memdup(tmp, len);
- if (!stream->dst_id)
+ if (!stream->dst_id) {
+ silc_mutex_unlock(stream->lock);
return FALSE;
+ }
stream->dst_id_type = dst_id_type;
stream->dst_id_len = len;
}
+ silc_mutex_unlock(stream->lock);
+
return TRUE;
}
silc_mutex_unlock(stream->engine->lock);
}
-/* Creates streamer */
-
-SilcStream silc_packet_streamer_create(SilcPacketStream stream,
- SilcPacketType packet_type,
- SilcPacketFlags packet_flags)
-{
- /* XXX TODO */
- return NULL;
-}
-
-/* Destroyes streamer */
-
-void silc_packet_streamer_destroy(SilcStream stream)
-{
-
-}
-
-
/****************************** Packet Sending ******************************/
/* Prepare outgoing data buffer for packet sending. Returns the
packet.data, silc_buffer_len(&packet));
/* Encrypt the packet */
- if (cipher)
+ if (cipher) {
+ SILC_LOG_DEBUG(("Encrypting packet"));
if (!silc_cipher_encrypt(cipher, packet.data, packet.data,
enclen, NULL)) {
SILC_LOG_ERROR(("Packet encryption failed"));
silc_mutex_unlock(stream->lock);
return FALSE;
}
+ }
/* Compute HMAC */
if (hmac) {
return FALSE;
return silc_packet_send_raw(stream, type, flags,
- src_id_type,
- src_id_data,
- src_id_len,
- dst_id_type,
- dst_id_data,
- dst_id_len,
+ src_id ? src_id_type : stream->src_id_type,
+ src_id ? src_id_data : stream->src_id,
+ src_id ? src_id_len : stream->src_id_len,
+ dst_id ? dst_id_type : stream->dst_id_type,
+ dst_id ? dst_id_data : stream->dst_id,
+ dst_id ? dst_id_len : stream->dst_id_len,
data, data_len,
- cipher,
- hmac);
+ cipher ? cipher : stream->send_key,
+ hmac ? hmac : stream->send_hmac);
}
return TRUE;
}
-/* Dispatch packet to application */
+/* Dispatch packet to application. Called with stream->lock locked. */
static void silc_packet_dispatch(SilcPacket packet)
{
/* Parse the packet */
if (!silc_packet_parse(packet)) {
+ silc_mutex_unlock(packet->stream->lock);
SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MALFORMED);
+ silc_mutex_lock(packet->stream->lock);
silc_packet_free(packet);
return;
}
if (!stream->process) {
/* Send to default processor as no others exist */
SILC_LOG_DEBUG(("Dispatching packet to default callbacks"));
+ silc_mutex_unlock(packet->stream->lock);
if (!stream->engine->callbacks->
packet_receive(stream->engine, stream, packet,
stream->engine->callback_context,
stream->stream_context))
silc_packet_free(packet);
+ silc_mutex_lock(packet->stream->lock);
return;
}
if (!default_sent && p->priority <= 0) {
SILC_LOG_DEBUG(("Dispatching packet to default callbacks"));
default_sent = TRUE;
+ silc_mutex_unlock(packet->stream->lock);
if (stream->engine->callbacks->
packet_receive(stream->engine, stream, packet,
stream->engine->callback_context,
stream->stream_context)) {
+ silc_mutex_lock(packet->stream->lock);
return;
}
+ silc_mutex_lock(packet->stream->lock);
}
/* Send to processor */
if (!p->types) {
/* Send all packet types */
SILC_LOG_DEBUG(("Dispatching packet to %p callbacks", p->callbacks));
+ silc_mutex_unlock(packet->stream->lock);
if (p->callbacks->packet_receive(stream->engine, stream, packet,
p->callback_context,
- stream->stream_context))
+ stream->stream_context)) {
+ silc_mutex_lock(packet->stream->lock);
return;
+ }
+ silc_mutex_lock(packet->stream->lock);
} else {
/* Send specific types */
- for (pt = p->types; *pt; pt++)
- if (*pt == packet->type) {
- SILC_LOG_DEBUG(("Dispatching packet to %p callbacks",
- p->callbacks));
- if (p->callbacks->packet_receive(stream->engine, stream, packet,
- p->callback_context,
- stream->stream_context))
- return;
- break;
+ for (pt = p->types; *pt; pt++) {
+ if (*pt != packet->type)
+ continue;
+ SILC_LOG_DEBUG(("Dispatching packet to %p callbacks", p->callbacks));
+ silc_mutex_unlock(packet->stream->lock);
+ if (p->callbacks->packet_receive(stream->engine, stream, packet,
+ p->callback_context,
+ stream->stream_context)) {
+ silc_mutex_lock(packet->stream->lock);
+ return;
}
+ silc_mutex_lock(packet->stream->lock);
+ break;
+ }
}
}
if (!default_sent) {
/* Send to default processor as it has not been sent yet */
SILC_LOG_DEBUG(("Dispatching packet to default callbacks"));
+ silc_mutex_unlock(packet->stream->lock);
if (stream->engine->callbacks->
packet_receive(stream->engine, stream, packet,
stream->engine->callback_context,
- stream->stream_context))
+ stream->stream_context)) {
+ silc_mutex_lock(packet->stream->lock);
return;
+ }
+ silc_mutex_lock(packet->stream->lock);
}
/* If we got here, no one wanted the packet, so drop it */
silc_packet_free(packet);
}
-/* Process incoming data and parse packets. */
+/* Process incoming data and parse packets. Called with stream->lock
+ locked. */
static void silc_packet_read_process(SilcPacketStream stream)
{
/* Sanity checks */
if (packetlen < SILC_PACKET_MIN_LEN) {
SILC_LOG_ERROR(("Received too short packet"));
+ silc_mutex_unlock(stream->lock);
SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MALFORMED);
+ silc_mutex_lock(stream->lock);
memset(tmp, 0, sizeof(tmp));
silc_buffer_reset(&stream->inbuf);
return;
if (!silc_packet_check_mac(stream->receive_hmac, stream->inbuf.data,
paddedlen, stream->inbuf.data + paddedlen,
stream->receive_psn)) {
+ silc_mutex_unlock(stream->lock);
SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MAC_FAILED);
+ silc_mutex_lock(stream->lock);
memset(tmp, 0, sizeof(tmp));
silc_buffer_reset(&stream->inbuf);
return;
/* Get packet */
packet = silc_packet_alloc(stream->engine);
if (!packet) {
+ silc_mutex_unlock(stream->lock);
SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_NO_MEMORY);
+ silc_mutex_lock(stream->lock);
memset(tmp, 0, sizeof(tmp));
silc_buffer_reset(&stream->inbuf);
return;
silc_buffer_truelen(&packet->buffer) +
(paddedlen -
silc_buffer_truelen(&packet->buffer)))) {
+ silc_mutex_unlock(stream->lock);
SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_NO_MEMORY);
+ silc_mutex_lock(stream->lock);
silc_packet_free(packet);
memset(tmp, 0, sizeof(tmp));
silc_buffer_reset(&stream->inbuf);
ret = silc_packet_decrypt(stream->receive_key, stream->receive_hmac,
stream->receive_psn, &packet->buffer, normal);
if (ret < 0) {
+ silc_mutex_unlock(stream->lock);
SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_DECRYPTION_FAILED);
+ silc_mutex_lock(stream->lock);
silc_packet_free(packet);
memset(tmp, 0, sizeof(tmp));
return;
silc_buffer_reset(&stream->inbuf);
}
+
+
+/****************************** Packet Waiting ******************************/
+
+/* Packet wait receive callback */
+static SilcBool
+silc_packet_wait_packet_receive(SilcPacketEngine engine,
+ SilcPacketStream stream,
+ SilcPacket packet,
+ void *callback_context,
+ void *stream_context);
+
+/* Packet waiting callbacks */
+static SilcPacketCallbacks silc_packet_wait_cbs =
+{
+ silc_packet_wait_packet_receive, NULL, NULL
+};
+
+/* Packet waiting context */
+typedef struct {
+ SilcMutex wait_lock;
+ SilcCond wait_cond;
+ SilcList packet_queue;
+ unsigned int stopped : 1;
+} *SilcPacketWait;
+
+/* Packet wait receive callback */
+
+static SilcBool
+silc_packet_wait_packet_receive(SilcPacketEngine engine,
+ SilcPacketStream stream,
+ SilcPacket packet,
+ void *callback_context,
+ void *stream_context)
+{
+ SilcPacketWait pw = callback_context;
+
+ /* Signal the waiting thread for a new packet */
+ silc_mutex_lock(pw->wait_lock);
+
+ if (pw->stopped) {
+ silc_mutex_unlock(pw->wait_lock);
+ return FALSE;
+ }
+
+ silc_list_add(pw->packet_queue, packet);
+ silc_cond_broadcast(pw->wait_cond);
+
+ silc_mutex_unlock(pw->wait_lock);
+
+ return TRUE;
+}
+
+/* Initialize packet waiting */
+
+void *silc_packet_wait_init(SilcPacketStream stream, ...)
+{
+ SilcPacketWait pw;
+ SilcBool ret;
+ va_list ap;
+
+ pw = silc_calloc(1, sizeof(*pw));
+ if (!pw)
+ return NULL;
+
+ /* Allocate mutex and conditional variable */
+ if (!silc_mutex_alloc(&pw->wait_lock)) {
+ silc_free(pw);
+ return NULL;
+ }
+ if (!silc_cond_alloc(&pw->wait_cond)) {
+ silc_mutex_free(pw->wait_lock);
+ silc_free(pw);
+ return NULL;
+ }
+
+ /* Link to the packet stream for the requested packet types */
+ va_start(ap, stream);
+ ret = silc_packet_stream_link_va(stream, &silc_packet_wait_cbs, pw,
+ 10000000, ap);
+ va_end(ap);
+ if (!ret) {
+ silc_cond_free(pw->wait_cond);
+ silc_mutex_free(pw->wait_lock);
+ silc_free(pw);
+ return NULL;
+ }
+
+ /* Initialize packet queue */
+ silc_list_init(pw->packet_queue, struct SilcPacketStruct, next);
+
+ return (void *)pw;
+}
+
+/* Uninitialize packet waiting */
+
+void silc_packet_wait_uninit(void *waiter, SilcPacketStream stream)
+{
+ SilcPacketWait pw = waiter;
+ SilcPacket packet;
+
+ /* Signal any threads to stop waiting */
+ silc_mutex_lock(pw->wait_lock);
+ pw->stopped = TRUE;
+ silc_cond_broadcast(pw->wait_cond);
+ silc_mutex_unlock(pw->wait_lock);
+
+ /* Re-acquire lock and free resources */
+ silc_mutex_lock(pw->wait_lock);
+ silc_packet_stream_unlink(stream, &silc_packet_wait_cbs, pw);
+
+ /* Free any remaining packets */
+ silc_list_start(pw->packet_queue);
+ while ((packet = silc_list_get(pw->packet_queue)) != SILC_LIST_END)
+ silc_packet_free(packet);
+
+ silc_mutex_unlock(pw->wait_lock);
+ silc_cond_free(pw->wait_cond);
+ silc_mutex_free(pw->wait_lock);
+ silc_free(pw);
+}
+
+/* Blocks thread until a packet has been received. */
+
+int silc_packet_wait(void *waiter, int timeout, SilcPacket *return_packet)
+{
+ SilcPacketWait pw = waiter;
+ SilcBool ret = FALSE;
+
+ silc_mutex_lock(pw->wait_lock);
+
+ /* Wait here until packet has arrived */
+ while (silc_list_count(pw->packet_queue) == 0) {
+ if (pw->stopped) {
+ silc_mutex_unlock(pw->wait_lock);
+ return -1;
+ }
+ ret = silc_cond_timedwait(pw->wait_cond, pw->wait_lock, timeout);
+ }
+
+ /* Return packet */
+ silc_list_start(pw->packet_queue);
+ *return_packet = silc_list_get(pw->packet_queue);
+ silc_list_del(pw->packet_queue, *return_packet);
+
+ silc_mutex_unlock(pw->wait_lock);
+
+ return ret == TRUE ? 1 : 0;
+}