Added SILC Buffer Stream API
authorPekka Riikonen <priikone@silcnet.org>
Tue, 17 Jun 2008 15:34:33 +0000 (18:34 +0300)
committerPekka Riikonen <priikone@silcnet.org>
Tue, 17 Jun 2008 15:34:33 +0000 (18:34 +0300)
Buffer stream interface to send and receive buffers.  The benefit of this
interface is that the receiver need not parse buffers from the received
data but each buffer sent is delivered separately to the receiver
callback, even if multiple buffers were received at the same time.  The
length of the buffer is delivered with the data.  The buffer data follows
a 32-bit length field in the stream.

lib/silcutil/Makefile.ad
lib/silcutil/silcbufferstream.c [new file with mode: 0644]
lib/silcutil/silcbufferstream.h [new file with mode: 0644]
lib/silcutil/silcruntime.h.in
lib/silcutil/tests/Makefile.am
lib/silcutil/tests/test_silcbufferstream.c [new file with mode: 0644]

index ef7efb9c7270ebef541049be0ac80db167c512a1..75df24b23722da997fc8dd777dfa5c5dc64f3f08 100644 (file)
@@ -67,7 +67,8 @@ libsilcutil_la_SOURCES = \
        silcregex.c     \
        silcthreadqueue.c \
        silcrand.c      \
-       silcglobal.c
+       silcglobal.c    \
+       silcbufferstream.c
 
 include_HEADERS =      \
        $(SILC_DIST_HEADER) \
@@ -122,7 +123,8 @@ include_HEADERS =   \
        silcrand.h      \
        silcglobal.h    \
        silcruntime.h   \
-       silcdir.h
+       silcdir.h       \
+       silcbufferstream.h
 
 SILC_EXTRA_DIST =
 
diff --git a/lib/silcutil/silcbufferstream.c b/lib/silcutil/silcbufferstream.c
new file mode 100644 (file)
index 0000000..9458c12
--- /dev/null
@@ -0,0 +1,315 @@
+/*
+
+  silcbufferstream.c
+
+  Author: Pekka Riikonen <priikone@silcnet.org>
+
+  Copyright (C) 2008 Pekka Riikonen
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; version 2 of the License.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+*/
+
+#include <silcruntime.h>
+
+/************************** Types and definitions ***************************/
+
+#define SILC_IS_BUFFER_STREAM(s) (s && s->ops == &silc_buffer_stream_ops)
+
+const SilcStreamOps silc_buffer_stream_ops;
+
+/* Buffer stream context */
+typedef struct {
+  const SilcStreamOps *ops;
+  SilcStream stream;
+  SilcBuffer outbuf;
+  SilcBuffer inbuf;
+  SilcBufferStruct queue;
+  SilcBufferReceiveCallback receiver;
+  void *context;
+  unsigned int closed   : 1;
+} *SilcBufferStream;
+
+/************************ Static utility functions **************************/
+
+/* IO callback */
+
+static void silc_buffer_stream_io(SilcStream stream,
+                                 SilcStreamStatus status,
+                                 void *context)
+{
+  SilcBufferStream bs = context;
+  SilcBuffer buffer = NULL;
+  SilcUInt32 buf_len;
+  int ret, len;
+
+  if (bs->closed)
+    return;
+
+  if (status == SILC_STREAM_CAN_READ) {
+    /* Read data */
+    SILC_LOG_DEBUG(("Read data from buffer stream %p", bs));
+
+    while ((ret = silc_stream_read(bs->stream, bs->inbuf->tail,
+                                  silc_buffer_taillen(bs->inbuf))) > 0) {
+      if (!buffer) {
+       buffer = silc_buffer_alloc(0);
+       if (!buffer)
+         return;
+      }
+
+      silc_buffer_pull_tail(bs->inbuf, ret);
+
+      /* Parse the buffer */
+      while ((len = silc_buffer_unformat(bs->inbuf,
+                                        SILC_STR_BUFFER_ALLOC(buffer),
+                                        SILC_STR_END)) > 0) {
+       /* Deliver the buffer */
+       SILC_LOG_HEXDUMP(("Received buffer, size %d",
+                         silc_buffer_len(buffer)),
+                        silc_buffer_data(buffer), silc_buffer_len(buffer));
+       bs->receiver(SILC_OK, (SilcStream)bs, buffer, bs->context);
+       silc_buffer_pull(bs->inbuf, len);
+
+       buffer = silc_buffer_alloc(0);
+       if (!buffer)
+         return;
+      }
+
+      if (silc_buffer_len(bs->inbuf) > 0) {
+       /* Not complete buffer, read more data */
+       buf_len = 4;
+       if (silc_buffer_len(bs->inbuf) >= 4) {
+         SILC_GET32_MSB(buf_len, bs->inbuf->data);
+         SILC_LOG_DEBUG(("Incomplete buffer, wait for rest, buffer size %d",
+                         buf_len));
+       }
+
+       /* Enlarge inbuf if needed */
+       if (silc_buffer_taillen(bs->inbuf) < buf_len)
+         silc_buffer_realloc(bs->inbuf, silc_buffer_truelen(bs->inbuf) +
+                             buf_len);
+       continue;
+      }
+
+      /* All data read, read more */
+      silc_buffer_reset(bs->inbuf);
+    }
+
+    silc_buffer_free(buffer);
+
+    if (ret == 0 || ret == -2) {
+      bs->receiver(silc_errno, (SilcStream)bs, NULL, bs->context);
+      return;
+    }
+  } else {
+    /* Write any pending data */
+    SILC_LOG_DEBUG(("Write pending data to buffer stream %p", bs));
+
+    while (silc_buffer_len(&bs->queue) > 0) {
+      ret = silc_stream_write(bs->stream, silc_buffer_data(&bs->queue),
+                             silc_buffer_len(&bs->queue));
+      if (silc_unlikely(ret == 0))
+       return;
+
+      if (silc_unlikely(ret == -2))
+       return;
+
+      if (silc_unlikely(ret == -1)) {
+       SILC_LOG_DEBUG(("Buffer stream %p would block, send later", bs));
+       return;
+      }
+
+      /* Wrote data */
+      silc_buffer_pull(&bs->queue, ret);
+    }
+
+    memset(&bs->queue, 0, sizeof(bs->queue));
+    silc_buffer_reset(bs->outbuf);
+  }
+}
+
+/****************************** Public API **********************************/
+
+/* Create buffer stream */
+
+SilcStream silc_buffer_stream_create(SilcStream stream,
+                                    SilcBufferReceiveCallback receiver,
+                                    void *context)
+{
+  SilcBufferStream bs;
+
+  if (!stream || !receiver) {
+    silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
+    return NULL;
+  }
+
+  bs = silc_calloc(1, sizeof(*bs));
+  if (!bs)
+    return NULL;
+
+  SILC_LOG_DEBUG(("Created new buffer stream %p", bs));
+
+  bs->ops = &silc_buffer_stream_ops;
+  bs->stream = stream;
+  bs->receiver = receiver;
+  bs->context = context;
+  bs->inbuf = silc_buffer_alloc(32);
+  bs->outbuf = silc_buffer_alloc(0);
+  if (!bs->inbuf || !bs->outbuf) {
+    silc_buffer_free(bs->inbuf);
+    silc_buffer_free(bs->outbuf);
+    silc_free(bs);
+    return NULL;
+  }
+
+  /* Set IO callback to the underlaying stream */
+  silc_stream_set_notifier(bs->stream,
+                          silc_stream_get_schedule(bs->stream),
+                          silc_buffer_stream_io, bs);
+
+  return (SilcStream)bs;
+}
+
+/* Send buffer to stream */
+
+SilcBool silc_buffer_stream_send(SilcStream stream,
+                                SilcBuffer buffer)
+{
+  SilcBufferStream bs = stream;
+  int ret;
+
+  SILC_LOG_HEXDUMP(("Send to buffer stream %p %d bytes", bs,
+                   silc_buffer_len(buffer)),
+                  silc_buffer_data(buffer), silc_buffer_len(buffer));
+
+  if (silc_unlikely(!SILC_IS_BUFFER_STREAM(bs))) {
+    silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
+    return FALSE;
+  }
+
+  if (silc_unlikely(!buffer)) {
+    silc_set_errno(SILC_ERR_INVALID_ARGUMENT);
+    return FALSE;
+  }
+
+  if (silc_unlikely(bs->closed)) {
+    SILC_LOG_DEBUG(("Buffer stream %p is closed", bs));
+    silc_set_errno(SILC_ERR_NOT_VALID);
+    return FALSE;
+  }
+
+  /* Put to queue */
+  if (silc_buffer_format(bs->outbuf,
+                        SILC_STR_ADVANCE,
+                        SILC_STR_BUFFER(buffer),
+                        SILC_STR_END) < 0)
+    return FALSE;
+
+  ret = silc_buffer_headlen(&bs->queue);
+  bs->queue.head = bs->outbuf->head;
+  bs->queue.data = bs->queue.head + ret;
+  bs->queue.tail = bs->outbuf->data;
+  bs->queue.end  = bs->outbuf->end;
+
+  /* Write the queue buffer */
+  while (silc_buffer_len(&bs->queue) > 0) {
+    ret = silc_stream_write(bs->stream, silc_buffer_data(&bs->queue),
+                           silc_buffer_len(&bs->queue));
+    if (silc_unlikely(ret == 0))
+      return FALSE;
+
+    if (silc_unlikely(ret == -2))
+      return FALSE;
+
+    if (silc_unlikely(ret == -1)) {
+      SILC_LOG_DEBUG(("Buffer stream %p would block, send later", bs));
+      return TRUE;
+    }
+
+    /* Wrote data */
+    silc_buffer_pull(&bs->queue, ret);
+  }
+
+  memset(&bs->queue, 0, sizeof(bs->queue));
+  silc_buffer_reset(bs->outbuf);
+
+  SILC_LOG_DEBUG(("Buffer sent to buffer stream %p", bs));
+
+  return TRUE;
+}
+
+/******************************* Stream API *********************************/
+
+int silc_buffer_stream_read(SilcStream stream, unsigned char *buf,
+                           SilcUInt32 buf_len)
+{
+  SILC_LOG_ERROR(("The silc_stream_read cannot be used with buffer streams"));
+  return -2;
+}
+
+int silc_buffer_stream_write(SilcStream stream, const unsigned char *data,
+                            SilcUInt32 data_len)
+{
+  SILC_LOG_ERROR(("Use silc_buffer_stream_send with buffer streams"));
+  return -2;
+}
+
+SilcBool silc_buffer_stream_close(SilcStream stream)
+{
+  SilcBufferStream bs = stream;
+
+  SILC_LOG_DEBUG(("Closing buffer stream %p", bs));
+
+  bs->closed = TRUE;
+  silc_stream_set_notifier(bs->stream,
+                          silc_stream_get_schedule(bs->stream), NULL, NULL);
+
+  return TRUE;
+}
+
+void silc_buffer_stream_destroy(SilcStream stream)
+{
+  SilcBufferStream bs = stream;
+
+  SILC_LOG_DEBUG(("Destroying buffer stream %p", bs));
+
+  silc_buffer_stream_close(stream);
+  silc_buffer_free(bs->outbuf);
+  silc_buffer_free(bs->inbuf);
+  silc_free(bs);
+}
+
+SilcBool silc_buffer_stream_notifier(SilcStream stream,
+                                    SilcSchedule schedule,
+                                    SilcStreamNotifier callback,
+                                    void *context)
+{
+  SILC_LOG_ERROR(("The silc_stream_set_notifier cannot be used with "
+                 "buffer streams"));
+  return FALSE;
+}
+
+SilcSchedule silc_buffer_stream_get_schedule(SilcStream stream)
+{
+  SilcBufferStream bs = stream;
+  return silc_stream_get_schedule(bs->stream);
+}
+
+/* Buffer stream operations */
+const SilcStreamOps silc_buffer_stream_ops =
+{
+  silc_buffer_stream_read,
+  silc_buffer_stream_write,
+  silc_buffer_stream_close,
+  silc_buffer_stream_destroy,
+  silc_buffer_stream_notifier,
+  silc_buffer_stream_get_schedule
+};
diff --git a/lib/silcutil/silcbufferstream.h b/lib/silcutil/silcbufferstream.h
new file mode 100644 (file)
index 0000000..f5c7ce0
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+
+  silcbufferstream.h
+
+  Author: Pekka Riikonen <priikone@silcnet.org>
+
+  Copyright (C) 2008 Pekka Riikonen
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; version 2 of the License.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+*/
+
+/****h* silcutil/Buffer Stream Interface
+ *
+ * DESCRIPTION
+ *
+ * Buffer stream interface to send and receive buffers.  The benefit of this
+ * interface is that the receiver need not parse buffers from the received
+ * data but each buffer sent is delivered separately to the receiver
+ * callback, even if multiple buffers were received at the same time.  The
+ * length of the buffer is delivered with the data.  The buffer data follows
+ * a 32-bit length field in the stream.
+ *
+ * This interface is named SILC Buffer Stream API instead of simply SILC
+ * Packet API which would be more desriptive name but that API name is already
+ * used by another SILC distribution.
+ *
+ ***/
+
+#ifndef SILCBUFFERSTREAM_H
+#define SILCBUFFERSTREAM_H
+
+/****f* silcutil/SilcBufferReceiveCallback
+ *
+ * SYNOPSIS
+ *
+ *    typedef void (*SilcBufferReceiveCallback)(SilcResult status,
+ *                                              SilcStream stream,
+ *                                              SilcBuffer buffer,
+ *                                              void *context);
+ *
+ * DESCRIPTION
+ *
+ *    Callback function to deliver the received `buffer' from the `stream'.
+ *    The `buffer' is the buffer that was sent to the stream.  If more than
+ *    one buffers were sent each is delivered separately to this callback.
+ *    The `status' will indicate an error if such occurred in the stream.
+ *    The `buffer' is NULL in case of error.  The receiver must free
+ *    the `buffer'.
+ *
+ ***/
+typedef void (*SilcBufferReceiveCallback)(SilcResult status,
+                                         SilcStream stream,
+                                         SilcBuffer buffer,
+                                         void *context);
+
+/****f* silcutil/silc_buffer_stream_create
+ *
+ * SYNOPSIS
+ *
+ *    SilcStream silc_buffer_stream_create(SilcStream stream,
+ *                                         SilcBufferReceiveCallback receiver,
+ *                                         void *context);
+ *
+ * DESCRIPTION
+ *
+ *    Creates a buffer stream and returns it.  The `stream' is the underlaying
+ *    stream to be used to actually send the buffer and receive buffers.
+ *    The returned stream is used with this API to send the buffers.  The
+ *    `stream' must stay valid as long the buffer stream is used.
+ *
+ *    To send buffers to the stream silc_buffer_stream_send can be used.
+ *    The silc_stream_write cannot be used with the returned stream.  Buffers
+ *    coming from the `stream' will be delivered to the `receiver' callback.
+ *    The returned stream and `context' will also be delivered to `receiver'.
+ *
+ *    The returned stream must be destroyed by calling silc_stream_destroy.
+ *    Other SilcStream API functions cannot be used with buffer stream.
+ *
+ ***/
+SilcStream silc_buffer_stream_create(SilcStream stream,
+                                    SilcBufferReceiveCallback receiver,
+                                    void *context);
+
+/****f* silcutil/silc_buffer_stream_send
+ *
+ * SYNOPSIS
+ *
+ *    SilcBool silc_buffer_stream_send(SilcStream stream,
+ *                                     SilcBuffer buffer);
+ *
+ * DESCRIPTION
+ *
+ *    Sends `buffer' to the buffer stream indicated by `stream'.  If the
+ *    `stream' is not a buffer stream created by silc_buffer_stream_create
+ *    this will return FALSE.  Returns FALSE on error and sets silc_errno.
+ *
+ ***/
+SilcBool silc_buffer_stream_send(SilcStream stream,
+                                SilcBuffer buffer);
+
+#endif /* SILCBUFFERSTREAM_H */
index a458af67a64c32875e6eceb376a0bc5ac0fbbaf1..503b3924a8125e7f9a99ae5063974f3222082455 100644 (file)
@@ -239,6 +239,7 @@ extern "C" {
 #include <silcfdstream.h>
 #include <silcmime.h>
 #include <silcrand.h>
+#include <silcbufferstream.h>
 #include <silchttpserver.h>
 #include <silchttpphp.h>
 
index 3b81a73a6433c14afbd7d9bd150b8164ce767167..ceade7d458d5f6b2fb5e6d130be76b55ded0150f 100644 (file)
@@ -24,7 +24,7 @@ check_PROGRAMS = \
        test_silcatomic test_silcmutex test_silctime test_silcthread \
        test_silcdll test_silcenv test_silctimer test_silcbitops \
        test_silcregex test_silcbuffmt test_silcdir test_silcthreadqueue \
-       test_silcrand test_silcglobal
+       test_silcrand test_silcglobal test_silcbufferstream
 
 TESTS = test_silcstrutil test_silcstringprep test_silchashtable \
        test_silclist test_silcfsm test_silcasync test_silcschedule \
@@ -32,7 +32,7 @@ TESTS = test_silcstrutil test_silcstringprep test_silchashtable \
        test_silcatomic test_silctime test_silcthread \
        test_silcdll test_silcenv test_silctimer test_silcbitops \
        test_silcregex test_silcbuffmt test_silcdir test_silcthreadqueue \
-       test_silcrand test_silcglobal
+       test_silcrand test_silcglobal test_silcbufferstream
 
 LIBS = $(SILC_COMMON_LIBS)
 LDADD = -L.. -L../.. -lsrt
diff --git a/lib/silcutil/tests/test_silcbufferstream.c b/lib/silcutil/tests/test_silcbufferstream.c
new file mode 100644 (file)
index 0000000..488a4de
--- /dev/null
@@ -0,0 +1,377 @@
+/* SILC Buffer Stream tests */
+
+#include "silcruntime.h"
+
+SilcSchedule schedule;
+
+typedef struct {
+  SilcFSM fsm;
+  SilcFSMEventStruct sema;
+  SilcFSMEventStruct s_sema;
+  SilcFSMEventStruct c_sema;
+  SilcFSMThreadStruct thread;
+  SilcNetListener server;
+  SilcStream client_stream;
+  SilcStream cbuf;
+  int c_num_buf;
+  SilcResult client_status;
+  SilcStream server_stream;
+  SilcStream sbuf;
+  int s_num_buf;
+  SilcResult server_status;
+  SilcBool success;
+} *Foo;
+
+SILC_FSM_STATE(test_st_start);
+SILC_FSM_STATE(test_st_second);
+SILC_FSM_STATE(test_st_finish);
+
+SILC_FSM_STATE(test_st_connect);
+SILC_FSM_STATE(test_st_connected);
+
+SILC_FSM_STATE(test_st_s_send_buffers);
+SILC_FSM_STATE(test_st_s_receive_buffers);
+SILC_FSM_STATE(test_st_c_send_buffers);
+SILC_FSM_STATE(test_st_c_receive_buffers);
+
+static void test_accept_connection(SilcResult status, SilcStream stream,
+                                  void *context)
+{
+  Foo f = context;
+  SILC_LOG_DEBUG(("Accepted new connection"));
+  f->client_status = status;
+  f->client_stream = stream;
+  SILC_FSM_EVENT_SIGNAL(&f->sema);
+}
+
+static void test_connected(SilcResult status, SilcStream stream,
+                          void *context)
+{
+  Foo f = context;
+  SILC_LOG_DEBUG(("Connected to server"));
+  f->server_status = status;
+  f->server_stream = stream;
+  SILC_FSM_CALL_CONTINUE(&f->thread);
+}
+
+static void receive_s_buffer(SilcResult status, SilcStream stream,
+                            SilcBuffer buffer, void *context)
+{
+  Foo f = context;
+  SILC_LOG_DEBUG(("Received buffer"));
+  silc_buffer_free(buffer);
+  SILC_FSM_EVENT_SIGNAL(&f->s_sema);
+}
+
+static void receive_c_buffer(SilcResult status, SilcStream stream,
+                            SilcBuffer buffer, void *context)
+{
+  Foo f = context;
+  SILC_LOG_DEBUG(("Received buffer"));
+  silc_buffer_free(buffer);
+  SILC_FSM_EVENT_SIGNAL(&f->c_sema);
+}
+
+SILC_FSM_STATE(test_st_connect)
+{
+  Foo f = fsm_context;
+
+  SILC_LOG_DEBUG(("test_st_connect"));
+  SILC_LOG_DEBUG(("Connecting to server"));
+
+  silc_fsm_next(fsm, test_st_connected);
+  SILC_FSM_CALL(silc_net_tcp_connect(NULL, "localhost", 5000,
+                                    silc_fsm_get_schedule(fsm),
+                                    test_connected, f));
+}
+
+SILC_FSM_STATE(test_st_connected)
+{
+  Foo f = fsm_context;
+  const char *host, *ip;
+  SilcUInt16 port;
+
+  SILC_LOG_DEBUG(("test_st_connected"));
+
+  if (f->server_status != SILC_OK) {
+    SILC_LOG_DEBUG(("Creating connection failed"));
+    return SILC_FSM_FINISH;
+  }
+
+  silc_socket_stream_get_info(f->server_stream, NULL, &host, &ip, &port);
+  SILC_LOG_DEBUG(("Connected to server %s, %s:%d", host, ip, port));
+
+  f->sbuf = silc_buffer_stream_create(f->server_stream,
+                                     receive_c_buffer, f);
+  if (!f->sbuf) {
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  silc_fsm_next(fsm, test_st_c_receive_buffers);
+  return SILC_FSM_CONTINUE;
+}
+
+SILC_FSM_STATE(test_st_start)
+{
+  Foo f = fsm_context;
+  int ports[3];
+  SilcUInt16 *ret_ports;
+  SilcUInt32 port_count;
+
+  SILC_LOG_DEBUG(("test_st_start"));
+
+  SILC_LOG_DEBUG(("Creating network listener to ports 2000, 3000 and 4000"));
+  ports[0] = 2000;
+  ports[1] = 3000;
+  ports[2] = 4000;
+  f->server = silc_net_tcp_create_listener2(NULL, ports, 3, FALSE, TRUE, TRUE,
+                                    silc_fsm_get_schedule(fsm),
+                                    test_accept_connection, f);
+  if (!f->server) {
+    /** Creating network listener failed */
+    SILC_LOG_DEBUG(("Listener creation failed"));
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  ret_ports = silc_net_listener_get_port(f->server, &port_count);
+  if (!ret_ports) {
+    SILC_LOG_DEBUG(("Listener does not work"));
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+  SILC_LOG_DEBUG(("Bound to port %d", ret_ports[0]));
+  SILC_LOG_DEBUG(("Bound to port %d", ret_ports[1]));
+  SILC_LOG_DEBUG(("Bound to port %d", ret_ports[2]));
+  silc_free(ret_ports);
+
+  /* Close this listener and create new one */
+  silc_net_close_listener(f->server);
+
+  SILC_LOG_DEBUG(("Creating network listener"));
+  f->server = silc_net_tcp_create_listener(NULL, 0, 5000, TRUE, TRUE,
+                                    silc_fsm_get_schedule(fsm),
+                                    test_accept_connection, f);
+  if (!f->server) {
+    /** Creating network listener failed */
+    SILC_LOG_DEBUG(("Listener creation failed"));
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  /* Create thread to connect to the listener */
+  silc_fsm_thread_init(&f->thread, fsm, f, NULL, NULL, FALSE);
+  silc_fsm_start(&f->thread, test_st_connect);
+
+  /** Start waiting connection */
+  SILC_LOG_DEBUG(("Start waiting for incoming connections"));
+  silc_fsm_event_init(&f->sema, fsm);
+  silc_fsm_next(fsm, test_st_second);
+  return SILC_FSM_CONTINUE;
+}
+
+SILC_FSM_STATE(test_st_second)
+{
+  Foo f = fsm_context;
+  const char *ip, *host;
+  SilcUInt16 port;
+
+  SILC_LOG_DEBUG(("test_st_second"));
+
+  SILC_FSM_EVENT_WAIT(&f->sema);
+
+  if (f->client_status != SILC_OK) {
+    /** Accepting new connection failed */
+    SILC_LOG_DEBUG(("Accepting failed %d", f->client_status));
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  silc_socket_stream_get_info(f->client_stream, NULL, &host, &ip, &port);
+  SILC_LOG_DEBUG(("Accepted new connection %s, %s:%d", host, ip, port));
+
+  f->cbuf = silc_buffer_stream_create(f->client_stream,
+                                     receive_s_buffer, f);
+  if (!f->cbuf) {
+    silc_fsm_next(fsm, test_st_finish);
+    return SILC_FSM_CONTINUE;
+  }
+
+  silc_fsm_next(fsm, test_st_s_send_buffers);
+  return SILC_FSM_CONTINUE;
+}
+
+SILC_FSM_STATE(test_st_s_send_buffers)
+{
+  Foo f = fsm_context;
+  SilcBuffer buffer;
+  int i;
+
+  SILC_LOG_DEBUG(("test_st_s_send_buffers"));
+
+  for (i = 0; i < 10; i++) {
+    buffer = silc_buffer_alloc_size(99 * (i + 1));
+    if (!buffer) {
+      silc_fsm_next(fsm, test_st_finish);
+      return SILC_FSM_CONTINUE;
+    }
+
+    SILC_LOG_HEXDUMP(("Send buffer %d to client", i + 1),
+                    silc_buffer_data(buffer), silc_buffer_len(buffer));
+
+    if (!silc_buffer_stream_send(f->cbuf, buffer)) {
+      silc_fsm_next(fsm, test_st_finish);
+      return SILC_FSM_CONTINUE;
+    }
+
+    silc_buffer_free(buffer);
+  }
+
+  silc_fsm_next(fsm, test_st_s_receive_buffers);
+  return SILC_FSM_CONTINUE;
+}
+
+SILC_FSM_STATE(test_st_s_receive_buffers)
+{
+  Foo f = fsm_context;
+
+  SILC_LOG_DEBUG(("test_st_s_receive_buffers"));
+
+  SILC_FSM_EVENT_WAIT(&f->s_sema);
+
+  f->s_num_buf++;
+  SILC_LOG_DEBUG(("Received buffer %d", f->s_num_buf));
+  if (f->s_num_buf < 10)
+    return SILC_FSM_CONTINUE;
+
+  /** Wait thread to terminate */
+  f->success = TRUE;
+  silc_fsm_next(fsm, test_st_finish);
+  return SILC_FSM_CONTINUE;
+}
+
+SILC_FSM_STATE(test_st_c_receive_buffers)
+{
+  Foo f = fsm_context;
+
+  SILC_LOG_DEBUG(("test_st_c_receive_buffers"));
+
+  SILC_FSM_EVENT_WAIT(&f->c_sema);
+
+  f->c_num_buf++;
+  SILC_LOG_DEBUG(("Received buffer %d", f->c_num_buf));
+  if (f->c_num_buf < 10)
+    return SILC_FSM_CONTINUE;
+
+  silc_fsm_next(fsm, test_st_c_send_buffers);
+  return SILC_FSM_CONTINUE;
+}
+
+SILC_FSM_STATE(test_st_c_send_buffers)
+{
+  Foo f = fsm_context;
+  SilcBuffer buffer;
+  int i;
+
+  SILC_LOG_DEBUG(("test_st_c_send_buffers"));
+
+  for (i = 0; i < 10; i++) {
+    buffer = silc_buffer_alloc_size(13 * (i + 1));
+    if (!buffer)
+      continue;
+
+    SILC_LOG_HEXDUMP(("Send buffer %d to server", i + 1),
+                    silc_buffer_data(buffer), silc_buffer_len(buffer));
+
+    if (!silc_buffer_stream_send(f->sbuf, buffer)) {
+      silc_fsm_next(fsm, test_st_finish);
+      return SILC_FSM_CONTINUE;
+    }
+
+    silc_buffer_free(buffer);
+  }
+
+  return SILC_FSM_FINISH;
+}
+
+SILC_FSM_STATE(test_st_finish)
+{
+  Foo f = fsm_context;
+
+  SILC_LOG_DEBUG(("test_st_finish"));
+
+  if (f->sbuf)
+    silc_stream_destroy(f->sbuf);
+  if (f->cbuf)
+    silc_stream_destroy(f->cbuf);
+  if (f->server_stream) {
+    silc_stream_close(f->server_stream);
+    silc_stream_destroy(f->server_stream);
+  }
+  if (f->client_stream) {
+    silc_stream_close(f->client_stream);
+    silc_stream_destroy(f->client_stream);
+  }
+
+  SILC_LOG_DEBUG(("Closing network listener"));
+  silc_net_close_listener(f->server);
+
+  SILC_LOG_DEBUG(("Finish machine"));
+  return SILC_FSM_FINISH;
+}
+
+static void destructor(SilcFSM fsm, void *fsm_context,
+                      void *destructor_context)
+{
+  SILC_LOG_DEBUG(("FSM destructor, stopping scheduler"));
+  silc_fsm_free(fsm);
+  silc_schedule_stop(schedule);
+}
+
+int main(int argc, char **argv)
+{
+  SilcBool success = FALSE;
+  SilcFSM fsm;
+  Foo f;
+
+  if (argc > 1 && !strcmp(argv[1], "-d")) {
+    silc_log_debug(TRUE);
+    silc_log_debug_hexdump(TRUE);
+    silc_log_set_debug_string("*net*,*stream*,*errno*");
+  }
+
+  SILC_LOG_DEBUG(("Allocating scheduler"));
+  schedule = silc_schedule_init(0, NULL, NULL, NULL);
+
+  f = silc_calloc(1, sizeof(*f));
+  if (!f)
+    goto err;
+
+  SILC_LOG_DEBUG(("Allocating FSM context"));
+  fsm = silc_fsm_alloc(f, destructor, NULL, schedule);
+  if (!fsm)
+    goto err;
+  silc_fsm_start(fsm, test_st_start);
+  f->fsm = fsm;
+
+  silc_fsm_event_init(&f->s_sema, fsm);
+  silc_fsm_event_init(&f->c_sema, fsm);
+
+  SILC_LOG_DEBUG(("Running scheduler"));
+  silc_schedule(schedule);
+
+  if (!f->success)
+    goto err;
+
+  silc_schedule_uninit(schedule);
+  silc_free(f);
+
+  success = TRUE;
+
+ err:
+  SILC_LOG_DEBUG(("Testing was %s", success ? "SUCCESS" : "FAILURE"));
+  fprintf(stderr, "Testing was %s\n", success ? "SUCCESS" : "FAILURE");
+
+  return !success;
+}