Added blocking support for wrapped packet stream.
authorPekka Riikonen <priikone@silcnet.org>
Tue, 23 Jan 2007 14:51:31 +0000 (14:51 +0000)
committerPekka Riikonen <priikone@silcnet.org>
Tue, 23 Jan 2007 14:51:31 +0000 (14:51 +0000)
lib/silccore/silcpacket.c
lib/silccore/silcpacket.h

index 55447f3d93a6bf571ed05e66fecda6e2fbdbf4b4..f6b4c87ff7a587ffd285801456683f75aec0fc30 100644 (file)
@@ -4,7 +4,7 @@
 
   Author: Pekka Riikonen <priikone@silcnet.org>
 
-  Copyright (C) 1997 - 2006 Pekka Riikonen
+  Copyright (C) 1997 - 2007 Pekka Riikonen
 
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
@@ -67,7 +67,7 @@ struct SilcPacketStreamStruct {
   struct SilcPacketStreamStruct *next;
   SilcPacketEngineContext sc;           /* Per scheduler context */
   SilcStream stream;                    /* Underlaying stream */
-  SilcMutex lock;                       /* Stream lock */
+  SilcMutex lock;                       /* Packet stream lock */
   SilcDList process;                    /* Packet processors, or NULL */
   SilcPacketRemoteUDP remote_udp;       /* UDP remote stream tuple, or NULL */
   void *stream_context;                         /* Stream context */
@@ -402,6 +402,8 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
 
   switch (status) {
   case SILC_STREAM_CAN_READ:
+    /* Reading is locked also with stream->lock because we may be reading
+       at the same time other thread is writing to same underlaying stream. */
     SILC_LOG_DEBUG(("Reading data from stream"));
 
     /* Read data from stream */
@@ -667,10 +669,16 @@ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine,
                                               silc_hash_string_compare, NULL,
                                               silc_packet_engine_hash_destr,
                                               NULL, TRUE);
+
   silc_mutex_unlock(engine->lock);
 
   /* Set IO notifier callback.  This schedules this stream for I/O. */
-  silc_stream_set_notifier(ps->stream, schedule, silc_packet_stream_io, ps);
+  if (!silc_stream_set_notifier(ps->stream, schedule, 
+                               silc_packet_stream_io, ps)) {
+    SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream"));
+    silc_packet_stream_destroy(ps);
+    return NULL;
+  }
 
   return ps;
 }
@@ -769,8 +777,12 @@ void silc_packet_stream_destroy(SilcPacketStream stream)
   if (!stream)
     return;
 
-  if (silc_atomic_get_int8(&stream->refcnt) > 1) {
+  if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) {
     stream->destroyed = TRUE;
+
+    /* Close the underlaying stream */
+    if (!stream->udp && stream->stream)
+      silc_stream_close(stream->stream);
     return;
   }
 
@@ -1001,14 +1013,22 @@ SilcBool silc_packet_get_sender(SilcPacket packet,
 void silc_packet_stream_ref(SilcPacketStream stream)
 {
   silc_atomic_add_int8(&stream->refcnt, 1);
+  SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream,
+                 silc_atomic_get_int8(&stream->refcnt) - 1,
+                 silc_atomic_get_int8(&stream->refcnt)));
 }
 
 /* Unreference packet stream */
 
 void silc_packet_stream_unref(SilcPacketStream stream)
 {
-  if (silc_atomic_sub_int8(&stream->refcnt, 1) == 0)
-    silc_packet_stream_destroy(stream);
+  SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream,
+                 silc_atomic_get_int8(&stream->refcnt),
+                 silc_atomic_get_int8(&stream->refcnt) - 1));
+  if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0)
+    return;
+  silc_atomic_add_int8(&stream->refcnt, 1);
+  silc_packet_stream_destroy(stream);
 }
 
 /* Return engine */
@@ -2261,12 +2281,15 @@ const SilcStreamOps silc_packet_stream_ops;
 typedef struct {
   const SilcStreamOps *ops;
   SilcPacketStream stream;
+  SilcMutex lock;
+  void *waiter;                        /* Waiter context in blocking mode */
   SilcStreamNotifier callback;
   void *context;
   SilcList in_queue;
   SilcPacketType type;
   SilcPacketFlags flags;
   unsigned int closed        : 1;
+  unsigned int blocking      : 1;
 } *SilcPacketWrapperStream;
 
 /* Packet wrapper callbacks */
@@ -2275,7 +2298,7 @@ static SilcPacketCallbacks silc_packet_wrap_cbs =
   silc_packet_wrap_packet_receive, NULL, NULL
 };
 
-/* Packet stream wrapper receive callback */
+/* Packet stream wrapper receive callback, non-blocking mode */
 
 static SilcBool
 silc_packet_wrap_packet_receive(SilcPacketEngine engine,
@@ -2289,7 +2312,9 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine,
   if (!pws->closed || !pws->callback)
     return FALSE;
 
+  silc_mutex_lock(pws->lock);
   silc_list_add(pws->in_queue, packet);
+  silc_mutex_unlock(pws->lock);
 
   /* Call notifier callback */
   pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context);
@@ -2308,12 +2333,26 @@ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf,
 
   if (pws->closed)
     return -2;
-  if (!silc_list_count(pws->in_queue))
-    return -1;
 
-  silc_list_start(pws->in_queue);
-  packet = silc_list_get(pws->in_queue);
-  silc_list_del(pws->in_queue, packet);
+  if (pws->blocking) {
+    /* Block until packet is received */
+    if ((silc_packet_wait(pws->waiter, 0, &packet)) < 0)
+      return -2;
+    if (pws->closed)
+      return -2;
+  } else {
+    /* Non-blocking mode */
+    silc_mutex_lock(pws->lock);
+    if (!silc_list_count(pws->in_queue)) {
+      silc_mutex_unlock(pws->lock);
+      return -1;
+    }
+
+    silc_list_start(pws->in_queue);
+    packet = silc_list_get(pws->in_queue);
+    silc_list_del(pws->in_queue, packet);
+    silc_mutex_unlock(pws->lock);
+  }
 
   len = silc_buffer_len(&packet->buffer);
   if (len > buf_len)
@@ -2348,9 +2387,14 @@ SilcBool silc_packet_wrap_close(SilcStream stream)
   if (pws->closed)
     return TRUE;
 
-  /* Unlink */
-  if (pws->callback)
-    silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws);
+  if (pws->blocking) {
+    /* Close packet waiter */
+    silc_packet_wait_uninit(pws->waiter, pws->stream);
+  } else {
+    /* Unlink */
+    if (pws->callback)
+      silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws);
+  }
   pws->closed = TRUE;
 
   return TRUE;
@@ -2370,6 +2414,8 @@ void silc_packet_wrap_destroy(SilcStream stream)
   silc_list_start(pws->in_queue);
   while ((packet = silc_list_get(pws->in_queue)))
     silc_packet_free(packet);
+  if (pws->lock)
+    silc_mutex_free(pws->lock);
   silc_packet_stream_unref(pws->stream);
 
   silc_free(pws);
@@ -2377,15 +2423,15 @@ void silc_packet_wrap_destroy(SilcStream stream)
 
 /* Link stream to receive packets */
 
-void silc_packet_wrap_notifier(SilcStream stream,
-                              SilcSchedule schedule,
-                              SilcStreamNotifier callback,
-                              void *context)
+SilcBool silc_packet_wrap_notifier(SilcStream stream,
+                                  SilcSchedule schedule,
+                                  SilcStreamNotifier callback,
+                                  void *context)
 {
   SilcPacketWrapperStream pws = stream;
 
-  if (pws->closed)
-    return;
+  if (pws->closed || pws->blocking)
+    return FALSE;
 
   /* Link to receive packets */
   if (callback)
@@ -2396,6 +2442,8 @@ void silc_packet_wrap_notifier(SilcStream stream,
 
   pws->callback = callback;
   pws->context = context;
+
+  return TRUE;
 }
 
 /* Return schedule */
@@ -2409,7 +2457,8 @@ SilcSchedule silc_packet_wrap_get_schedule(SilcStream stream)
 
 SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
                                    SilcPacketType type,
-                                   SilcPacketFlags flags)
+                                   SilcPacketFlags flags,
+                                  SilcBool blocking_mode)
 {
   SilcPacketWrapperStream pws;
 
@@ -2423,8 +2472,25 @@ SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
   pws->stream = stream;
   pws->type = type;
   pws->flags = flags;
+  pws->blocking = blocking_mode;
+
+  if (pws->blocking) {
+    /* Blocking mode.  Use packet waiter to do the thing. */
+    pws->waiter = silc_packet_wait_init(pws->stream, pws->type, -1);
+    if (!pws->waiter) {
+      silc_free(pws);
+      return NULL;
+    }
+  } else {
+    /* Non-blocking mode */
+    if (!silc_mutex_alloc(&pws->lock)) {
+      silc_free(pws);
+      return NULL;
+    }
+
+    silc_list_init(pws->in_queue, struct SilcPacketStruct, next);
+  }
 
-  silc_list_init(pws->in_queue, struct SilcPacketStruct, next);
   silc_packet_stream_ref(stream);
 
   return (SilcStream)pws;
index 48a25793c4a34858c0b9a256c4caa23c5d02c507..7ddabff11732bfbfc7b7c6fe856756738b1847f7 100644 (file)
@@ -618,7 +618,8 @@ void silc_packet_stream_unlink(SilcPacketStream stream,
  *
  *    SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
  *                                       SilcPacketType type,
- *                                       SilcPacketFlags flags);
+ *                                       SilcPacketFlags flags,
+ *                                       SilcBool blocking_mode);
  *
  * DESCRIPTION
  *
@@ -630,11 +631,18 @@ void silc_packet_stream_unlink(SilcPacketStream stream,
  *    stream can be destroyed by calling silc_stream_destroy.  It does not
  *    destroy the wrapped packet stream.
  *
- *    The silc_stream_set_notifier must be called before the returned stream
- *    can be used to receive packets.  The SILC_STREAM_CAN_READ will be
- *    returned to the notifier callback to indicate that a packet is ready
- *    for reading.  Calling silc_stream_read once returns one complete SILC
- *    packet data payload (which is of type of `type').
+ *    If the `blocking_mode' mode is TRUE then the silc_stream_read and
+ *    silc_stream_write may block the calling process or thread until SILC
+ *    packet is read or written.  If it is FALSE the stream is in non-blocking
+ *    mode and the calls never block.  The returned stream is thread-safe and
+ *    packets may be read and written in multi-threaded environment.
+ *
+ *    In non-blocking mode the silc_stream_set_notifier must be called before
+ *    the returned stream can be used to read packets.  The stream status
+ *    SILC_STREAM_CAN_READ will be returned to the notifier callback to
+ *    indicate that a packet is ready for reading.  Calling silc_stream_read
+ *    once returns one complete SILC packet data payload (which is of type of
+ *    `type').
  *
  *    The returned SilcStream can be used as any normal stream and all
  *    SilcStream API functions may be used with the stream.  This returns
@@ -643,7 +651,8 @@ void silc_packet_stream_unlink(SilcPacketStream stream,
  ***/
 SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
                                   SilcPacketType type,
-                                  SilcPacketFlags flags);
+                                  SilcPacketFlags flags,
+                                  SilcBool blocking_mode);
 
 /****f* silccore/SilcPacketAPI/silc_packet_get_sender
  *