Fixed packet wrapper stream API to support encoder/decoder
authorPekka Riikonen <priikone@silcnet.org>
Wed, 14 Feb 2007 14:47:58 +0000 (14:47 +0000)
committerPekka Riikonen <priikone@silcnet.org>
Wed, 14 Feb 2007 14:47:58 +0000 (14:47 +0000)
for packets, and to handle partial reading correctly.  Fixed
also inbuf size checking in reading to have enough space before
reading.

lib/silccore/silcpacket.c
lib/silccore/silcpacket.h

index f6b4c87ff7a587ffd285801456683f75aec0fc30..df8f0bf429b73d72effa56b78edf7d22f96cf614 100644 (file)
@@ -299,6 +299,11 @@ static inline SilcBool silc_packet_stream_read(SilcPacketStream ps,
   stream = ps->stream;
   inbuf = &ps->sc->inbuf;
 
+  /* Make sure there is enough room to read */
+  if (SILC_PACKET_DEFAULT_SIZE * 2 > silc_buffer_taillen(inbuf))
+    silc_buffer_realloc(inbuf, silc_buffer_truelen(inbuf) +
+                       (SILC_PACKET_DEFAULT_SIZE * 2));
+
   if (silc_socket_stream_is_udp(stream, &connected)) {
     if (!connected) {
       /* Connectionless UDP stream, read one UDP packet */
@@ -404,7 +409,7 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
   case SILC_STREAM_CAN_READ:
     /* Reading is locked also with stream->lock because we may be reading
        at the same time other thread is writing to same underlaying stream. */
-    SILC_LOG_DEBUG(("Reading data from stream"));
+    SILC_LOG_DEBUG(("Reading data from stream %p, ps %p", ps->stream, ps));
 
     /* Read data from stream */
     if (!silc_packet_stream_read(ps, &remote))
@@ -423,7 +428,8 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
     break;
 
   case SILC_STREAM_CAN_WRITE:
-    SILC_LOG_DEBUG(("Writing pending data to stream"));
+    SILC_LOG_DEBUG(("Writing pending data to stream %p, ps %p",
+                   ps->stream, ps));
 
     if (silc_unlikely(!silc_buffer_headlen(&ps->outbuf))) {
       silc_mutex_unlock(ps->lock);
@@ -673,7 +679,7 @@ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine,
   silc_mutex_unlock(engine->lock);
 
   /* Set IO notifier callback.  This schedules this stream for I/O. */
-  if (!silc_stream_set_notifier(ps->stream, schedule, 
+  if (!silc_stream_set_notifier(ps->stream, schedule,
                                silc_packet_stream_io, ps)) {
     SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream"));
     silc_packet_stream_destroy(ps);
@@ -2282,7 +2288,10 @@ typedef struct {
   const SilcStreamOps *ops;
   SilcPacketStream stream;
   SilcMutex lock;
-  void *waiter;                        /* Waiter context in blocking mode */
+  void *waiter;                          /* Waiter context in blocking mode */
+  SilcPacketWrapCoder coder;
+  void *coder_context;
+  SilcBuffer encbuf;
   SilcStreamNotifier callback;
   void *context;
   SilcList in_queue;
@@ -2290,6 +2299,7 @@ typedef struct {
   SilcPacketFlags flags;
   unsigned int closed        : 1;
   unsigned int blocking      : 1;
+  unsigned int read_more     : 1;
 } *SilcPacketWrapperStream;
 
 /* Packet wrapper callbacks */
@@ -2309,7 +2319,7 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine,
 {
   SilcPacketWrapperStream pws = callback_context;
 
-  if (!pws->closed || !pws->callback)
+  if (pws->closed || !pws->callback)
     return FALSE;
 
   silc_mutex_lock(pws->lock);
@@ -2322,6 +2332,19 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine,
   return TRUE;
 }
 
+/* Task callback to notify more data is available for reading */
+
+SILC_TASK_CALLBACK(silc_packet_wrap_read_more)
+{
+  SilcPacketWrapperStream pws = context;
+
+  if (pws->closed || !pws->callback)
+    return;
+
+  /* Call notifier callback */
+  pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context);
+}
+
 /* Read SILC packet */
 
 int silc_packet_wrap_read(SilcStream stream, unsigned char *buf,
@@ -2329,6 +2352,7 @@ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf,
 {
   SilcPacketWrapperStream pws = stream;
   SilcPacket packet;
+  SilcBool read_more = FALSE;
   int len;
 
   if (pws->closed)
@@ -2354,12 +2378,31 @@ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf,
     silc_mutex_unlock(pws->lock);
   }
 
+  /* Call decoder if set */
+  if (pws->coder && !pws->read_more)
+    pws->coder(stream, SILC_STREAM_CAN_READ, &packet->buffer,
+              pws->coder_context);
+
   len = silc_buffer_len(&packet->buffer);
-  if (len > buf_len)
+  if (len > buf_len) {
     len = buf_len;
+    read_more = TRUE;
+  }
 
+  /* Read data */
   memcpy(buf, packet->buffer.data, len);
 
+  if (read_more && !pws->blocking) {
+    /* More data will be available (in blocking mode not supported). */
+    silc_buffer_pull(&packet->buffer, len);
+    silc_list_insert(pws->in_queue, NULL, packet); 
+    silc_schedule_task_add_timeout(pws->stream->sc->schedule,
+                                  silc_packet_wrap_read_more, pws, 0, 0);
+    pws->read_more = TRUE;
+    return len;
+  }
+
+  pws->read_more = FALSE;
   silc_packet_free(packet);
   return len;
 }
@@ -2370,10 +2413,27 @@ int silc_packet_wrap_write(SilcStream stream, const unsigned char *data,
                           SilcUInt32 data_len)
 {
   SilcPacketWrapperStream pws = stream;
+  SilcBool ret = FALSE;
+
+  /* Call decoder if set */
+  if (pws->coder) {
+    silc_buffer_reset(pws->encbuf);
+    ret = pws->coder(stream, SILC_STREAM_CAN_WRITE, pws->encbuf,
+                    pws->coder_context);
+  }
 
   /* Send the SILC packet */
-  if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len))
-    return -2;
+  if (ret) {
+    if (!silc_packet_send_va(pws->stream, pws->type, pws->flags,
+                            SILC_STR_DATA(silc_buffer_data(pws->encbuf),
+                                          silc_buffer_len(pws->encbuf)),
+                            SILC_STR_DATA(data, data_len),
+                            SILC_STR_END))
+      return -2;
+  } else {
+    if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len))
+      return -2;
+  }
 
   return data_len;
 }
@@ -2416,6 +2476,8 @@ void silc_packet_wrap_destroy(SilcStream stream)
     silc_packet_free(packet);
   if (pws->lock)
     silc_mutex_free(pws->lock);
+  if (pws->encbuf)
+    silc_buffer_free(pws->encbuf);
   silc_packet_stream_unref(pws->stream);
 
   silc_free(pws);
@@ -2458,7 +2520,9 @@ SilcSchedule silc_packet_wrap_get_schedule(SilcStream stream)
 SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
                                    SilcPacketType type,
                                    SilcPacketFlags flags,
-                                  SilcBool blocking_mode)
+                                  SilcBool blocking_mode,
+                                  SilcPacketWrapCoder coder,
+                                  void *context)
 {
   SilcPacketWrapperStream pws;
 
@@ -2473,6 +2537,12 @@ SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
   pws->type = type;
   pws->flags = flags;
   pws->blocking = blocking_mode;
+  pws->coder = coder;
+  pws->coder_context = context;
+
+  /* Allocate small amount for encoder buffer. */
+  if (pws->coder)
+    pws->encbuf = silc_buffer_alloc(8);
 
   if (pws->blocking) {
     /* Blocking mode.  Use packet waiter to do the thing. */
index 7ddabff11732bfbfc7b7c6fe856756738b1847f7..cef13ca27a978cc49584cc9c337ec75efcf22a37 100644 (file)
@@ -114,9 +114,10 @@ typedef SilcUInt8 SilcPacketFlags;
 #define SILC_PACKET_FLAG_LIST             0x02   /* Packet is a list */
 #define SILC_PACKET_FLAG_BROADCAST        0x04   /* Packet is a broadcast */
 #define SILC_PACKET_FLAG_COMPRESSED       0x08    /* Payload is compressed */
+#define SILC_PACKET_FLAG_ACK              0x10    /* Acknowledge packet */
 
 /* Impelemntation specific flags */
-#define SILC_PACKET_FLAG_LONG_PAD         0x10    /* Use maximum padding */
+#define SILC_PACKET_FLAG_LONG_PAD         0x12    /* Use maximum padding */
 /***/
 
 /****s* silccore/SilcPacketAPI/SilcPacketEngine
@@ -612,6 +613,36 @@ void silc_packet_stream_unlink(SilcPacketStream stream,
                               SilcPacketCallbacks *callbacks,
                               void *callback_context);
 
+/****f* silccore/SilcPacketAPI/SilcPacketWrapCoder
+ *
+ * SYNOPSIS
+ *
+ *    typedef SilcBool (*SilcPacketWrapCoder)(SilcStream stream,
+ *                                            SilcStreamStatus status,
+ *                                            SilcBuffer buffer,
+ *                                            void *context);
+ *
+ * DESCRIPTION
+ *
+ *    The encoder/decoder callback for silc_packet_stream_wrap.  If the
+ *    `status' is SILC_STREAM_CAN_WRITE then additional data can be added
+ *    to `buffer'.  It is added before the data that is written with
+ *    silc_stream_write.  The silc_buffer_enlarge should be called to verify
+ *    there is enough room in `buffer' before adding data to it.  The `buffer'
+ *    must not be freed.
+ *
+ *    If the `status' is SILC_STREAM_CAN_READ then data from the `buffer'
+ *    may be read before it is passed to readed when silc_stream_read is
+ *    called.  The `buffer' may be advanced also to hide data in it.
+ *
+ *    This function returns FALSE in case of error.
+ *
+ ***/
+typedef SilcBool (*SilcPacketWrapCoder)(SilcStream stream,
+                                       SilcStreamStatus status,
+                                       SilcBuffer buffer,
+                                       void *context);
+
 /****f* silccore/SilcPacketAPI/silc_packet_stream_wrap
  *
  * SYNOPSIS
@@ -619,7 +650,9 @@ void silc_packet_stream_unlink(SilcPacketStream stream,
  *    SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
  *                                       SilcPacketType type,
  *                                       SilcPacketFlags flags,
- *                                       SilcBool blocking_mode);
+ *                                       SilcBool blocking_mode,
+ *                                       SilcPacketWrapCoder coder,
+ *                                       void *context);
  *
  * DESCRIPTION
  *
@@ -644,6 +677,12 @@ void silc_packet_stream_unlink(SilcPacketStream stream,
  *    once returns one complete SILC packet data payload (which is of type of
  *    `type').
  *
+ *    The `coder' is optional encoder/decoder callback which the packet engine
+ *    will call if it is non-NULL.  It can be used to encode additional data
+ *    into each packet when silc_stream_write is called or decode data before
+ *    it is passed to reader when silc_stream_read is called.  The `context'
+ *    is passed to `coder'.
+ *
  *    The returned SilcStream can be used as any normal stream and all
  *    SilcStream API functions may be used with the stream.  This returns
  *    NULL on error.
@@ -652,7 +691,9 @@ void silc_packet_stream_unlink(SilcPacketStream stream,
 SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
                                   SilcPacketType type,
                                   SilcPacketFlags flags,
-                                  SilcBool blocking_mode);
+                                  SilcBool blocking_mode,
+                                  SilcPacketWrapCoder coder,
+                                  void *context);
 
 /****f* silccore/SilcPacketAPI/silc_packet_get_sender
  *