From 4ad81546bc7519f1022850ac8561d365e0a60b20 Mon Sep 17 00:00:00 2001 From: Pekka Riikonen Date: Tue, 23 Jan 2007 14:51:31 +0000 Subject: [PATCH] Added blocking support for wrapped packet stream. --- lib/silccore/silcpacket.c | 112 ++++++++++++++++++++++++++++++-------- lib/silccore/silcpacket.h | 23 +++++--- 2 files changed, 105 insertions(+), 30 deletions(-) diff --git a/lib/silccore/silcpacket.c b/lib/silccore/silcpacket.c index 55447f3d..f6b4c87f 100644 --- a/lib/silccore/silcpacket.c +++ b/lib/silccore/silcpacket.c @@ -4,7 +4,7 @@ Author: Pekka Riikonen - Copyright (C) 1997 - 2006 Pekka Riikonen + Copyright (C) 1997 - 2007 Pekka Riikonen This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -67,7 +67,7 @@ struct SilcPacketStreamStruct { struct SilcPacketStreamStruct *next; SilcPacketEngineContext sc; /* Per scheduler context */ SilcStream stream; /* Underlaying stream */ - SilcMutex lock; /* Stream lock */ + SilcMutex lock; /* Packet stream lock */ SilcDList process; /* Packet processors, or NULL */ SilcPacketRemoteUDP remote_udp; /* UDP remote stream tuple, or NULL */ void *stream_context; /* Stream context */ @@ -402,6 +402,8 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status, switch (status) { case SILC_STREAM_CAN_READ: + /* Reading is locked also with stream->lock because we may be reading + at the same time other thread is writing to same underlaying stream. */ SILC_LOG_DEBUG(("Reading data from stream")); /* Read data from stream */ @@ -667,10 +669,16 @@ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine, silc_hash_string_compare, NULL, silc_packet_engine_hash_destr, NULL, TRUE); + silc_mutex_unlock(engine->lock); /* Set IO notifier callback. This schedules this stream for I/O. */ - silc_stream_set_notifier(ps->stream, schedule, silc_packet_stream_io, ps); + if (!silc_stream_set_notifier(ps->stream, schedule, + silc_packet_stream_io, ps)) { + SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream")); + silc_packet_stream_destroy(ps); + return NULL; + } return ps; } @@ -769,8 +777,12 @@ void silc_packet_stream_destroy(SilcPacketStream stream) if (!stream) return; - if (silc_atomic_get_int8(&stream->refcnt) > 1) { + if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) { stream->destroyed = TRUE; + + /* Close the underlaying stream */ + if (!stream->udp && stream->stream) + silc_stream_close(stream->stream); return; } @@ -1001,14 +1013,22 @@ SilcBool silc_packet_get_sender(SilcPacket packet, void silc_packet_stream_ref(SilcPacketStream stream) { silc_atomic_add_int8(&stream->refcnt, 1); + SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream, + silc_atomic_get_int8(&stream->refcnt) - 1, + silc_atomic_get_int8(&stream->refcnt))); } /* Unreference packet stream */ void silc_packet_stream_unref(SilcPacketStream stream) { - if (silc_atomic_sub_int8(&stream->refcnt, 1) == 0) - silc_packet_stream_destroy(stream); + SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream, + silc_atomic_get_int8(&stream->refcnt), + silc_atomic_get_int8(&stream->refcnt) - 1)); + if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) + return; + silc_atomic_add_int8(&stream->refcnt, 1); + silc_packet_stream_destroy(stream); } /* Return engine */ @@ -2261,12 +2281,15 @@ const SilcStreamOps silc_packet_stream_ops; typedef struct { const SilcStreamOps *ops; SilcPacketStream stream; + SilcMutex lock; + void *waiter; /* Waiter context in blocking mode */ SilcStreamNotifier callback; void *context; SilcList in_queue; SilcPacketType type; SilcPacketFlags flags; unsigned int closed : 1; + unsigned int blocking : 1; } *SilcPacketWrapperStream; /* Packet wrapper callbacks */ @@ -2275,7 +2298,7 @@ static SilcPacketCallbacks silc_packet_wrap_cbs = silc_packet_wrap_packet_receive, NULL, NULL }; -/* Packet stream wrapper receive callback */ +/* Packet stream wrapper receive callback, non-blocking mode */ static SilcBool silc_packet_wrap_packet_receive(SilcPacketEngine engine, @@ -2289,7 +2312,9 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine, if (!pws->closed || !pws->callback) return FALSE; + silc_mutex_lock(pws->lock); silc_list_add(pws->in_queue, packet); + silc_mutex_unlock(pws->lock); /* Call notifier callback */ pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context); @@ -2308,12 +2333,26 @@ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf, if (pws->closed) return -2; - if (!silc_list_count(pws->in_queue)) - return -1; - silc_list_start(pws->in_queue); - packet = silc_list_get(pws->in_queue); - silc_list_del(pws->in_queue, packet); + if (pws->blocking) { + /* Block until packet is received */ + if ((silc_packet_wait(pws->waiter, 0, &packet)) < 0) + return -2; + if (pws->closed) + return -2; + } else { + /* Non-blocking mode */ + silc_mutex_lock(pws->lock); + if (!silc_list_count(pws->in_queue)) { + silc_mutex_unlock(pws->lock); + return -1; + } + + silc_list_start(pws->in_queue); + packet = silc_list_get(pws->in_queue); + silc_list_del(pws->in_queue, packet); + silc_mutex_unlock(pws->lock); + } len = silc_buffer_len(&packet->buffer); if (len > buf_len) @@ -2348,9 +2387,14 @@ SilcBool silc_packet_wrap_close(SilcStream stream) if (pws->closed) return TRUE; - /* Unlink */ - if (pws->callback) - silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws); + if (pws->blocking) { + /* Close packet waiter */ + silc_packet_wait_uninit(pws->waiter, pws->stream); + } else { + /* Unlink */ + if (pws->callback) + silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws); + } pws->closed = TRUE; return TRUE; @@ -2370,6 +2414,8 @@ void silc_packet_wrap_destroy(SilcStream stream) silc_list_start(pws->in_queue); while ((packet = silc_list_get(pws->in_queue))) silc_packet_free(packet); + if (pws->lock) + silc_mutex_free(pws->lock); silc_packet_stream_unref(pws->stream); silc_free(pws); @@ -2377,15 +2423,15 @@ void silc_packet_wrap_destroy(SilcStream stream) /* Link stream to receive packets */ -void silc_packet_wrap_notifier(SilcStream stream, - SilcSchedule schedule, - SilcStreamNotifier callback, - void *context) +SilcBool silc_packet_wrap_notifier(SilcStream stream, + SilcSchedule schedule, + SilcStreamNotifier callback, + void *context) { SilcPacketWrapperStream pws = stream; - if (pws->closed) - return; + if (pws->closed || pws->blocking) + return FALSE; /* Link to receive packets */ if (callback) @@ -2396,6 +2442,8 @@ void silc_packet_wrap_notifier(SilcStream stream, pws->callback = callback; pws->context = context; + + return TRUE; } /* Return schedule */ @@ -2409,7 +2457,8 @@ SilcSchedule silc_packet_wrap_get_schedule(SilcStream stream) SilcStream silc_packet_stream_wrap(SilcPacketStream stream, SilcPacketType type, - SilcPacketFlags flags) + SilcPacketFlags flags, + SilcBool blocking_mode) { SilcPacketWrapperStream pws; @@ -2423,8 +2472,25 @@ SilcStream silc_packet_stream_wrap(SilcPacketStream stream, pws->stream = stream; pws->type = type; pws->flags = flags; + pws->blocking = blocking_mode; + + if (pws->blocking) { + /* Blocking mode. Use packet waiter to do the thing. */ + pws->waiter = silc_packet_wait_init(pws->stream, pws->type, -1); + if (!pws->waiter) { + silc_free(pws); + return NULL; + } + } else { + /* Non-blocking mode */ + if (!silc_mutex_alloc(&pws->lock)) { + silc_free(pws); + return NULL; + } + + silc_list_init(pws->in_queue, struct SilcPacketStruct, next); + } - silc_list_init(pws->in_queue, struct SilcPacketStruct, next); silc_packet_stream_ref(stream); return (SilcStream)pws; diff --git a/lib/silccore/silcpacket.h b/lib/silccore/silcpacket.h index 48a25793..7ddabff1 100644 --- a/lib/silccore/silcpacket.h +++ b/lib/silccore/silcpacket.h @@ -618,7 +618,8 @@ void silc_packet_stream_unlink(SilcPacketStream stream, * * SilcStream silc_packet_stream_wrap(SilcPacketStream stream, * SilcPacketType type, - * SilcPacketFlags flags); + * SilcPacketFlags flags, + * SilcBool blocking_mode); * * DESCRIPTION * @@ -630,11 +631,18 @@ void silc_packet_stream_unlink(SilcPacketStream stream, * stream can be destroyed by calling silc_stream_destroy. It does not * destroy the wrapped packet stream. * - * The silc_stream_set_notifier must be called before the returned stream - * can be used to receive packets. The SILC_STREAM_CAN_READ will be - * returned to the notifier callback to indicate that a packet is ready - * for reading. Calling silc_stream_read once returns one complete SILC - * packet data payload (which is of type of `type'). + * If the `blocking_mode' mode is TRUE then the silc_stream_read and + * silc_stream_write may block the calling process or thread until SILC + * packet is read or written. If it is FALSE the stream is in non-blocking + * mode and the calls never block. The returned stream is thread-safe and + * packets may be read and written in multi-threaded environment. + * + * In non-blocking mode the silc_stream_set_notifier must be called before + * the returned stream can be used to read packets. The stream status + * SILC_STREAM_CAN_READ will be returned to the notifier callback to + * indicate that a packet is ready for reading. Calling silc_stream_read + * once returns one complete SILC packet data payload (which is of type of + * `type'). * * The returned SilcStream can be used as any normal stream and all * SilcStream API functions may be used with the stream. This returns @@ -643,7 +651,8 @@ void silc_packet_stream_unlink(SilcPacketStream stream, ***/ SilcStream silc_packet_stream_wrap(SilcPacketStream stream, SilcPacketType type, - SilcPacketFlags flags); + SilcPacketFlags flags, + SilcBool blocking_mode); /****f* silccore/SilcPacketAPI/silc_packet_get_sender * -- 2.24.0