+ if (!silc_packet_dispatch(packet))
+ break;
+ }
+
+ out:
+ /* Add inbuf back to free list, if we owned it. */
+ if (stream->inbuf) {
+ silc_dlist_add(stream->sc->inbufs, inbuf);
+ stream->inbuf = NULL;
+ }
+
+ silc_buffer_reset(inbuf);
+}
+
+/****************************** Packet Waiting ******************************/
+
+/* Packet wait receive callback */
+static SilcBool
+silc_packet_wait_packet_receive(SilcPacketEngine engine,
+ SilcPacketStream stream,
+ SilcPacket packet,
+ void *callback_context,
+ void *stream_context);
+
+/* Packet waiting callbacks */
+static const SilcPacketCallbacks silc_packet_wait_cbs =
+{
+ silc_packet_wait_packet_receive, NULL, NULL
+};
+
+/* Packet waiting context */
+typedef struct {
+ SilcMutex wait_lock;
+ SilcCond wait_cond;
+ SilcList packet_queue;
+ unsigned char id[28];
+ unsigned int id_type : 2;
+ unsigned int id_len : 5;
+ unsigned int stopped : 1;
+} *SilcPacketWait;
+
+/* Packet wait receive callback */
+
+static SilcBool
+silc_packet_wait_packet_receive(SilcPacketEngine engine,
+ SilcPacketStream stream,
+ SilcPacket packet,
+ void *callback_context,
+ void *stream_context)
+{
+ SilcPacketWait pw = callback_context;
+
+ /* If source ID is specified check for it */
+ if (pw->id_len) {
+ if (pw->id_type != packet->src_id_type ||
+ memcmp(pw->id, packet->src_id, pw->id_len))
+ return FALSE;
+ }
+
+ /* Signal the waiting thread for a new packet */
+ silc_mutex_lock(pw->wait_lock);
+
+ if (silc_unlikely(pw->stopped)) {
+ silc_mutex_unlock(pw->wait_lock);
+ return FALSE;
+ }
+
+ silc_list_add(pw->packet_queue, packet);
+ silc_cond_broadcast(pw->wait_cond);
+
+ silc_mutex_unlock(pw->wait_lock);
+
+ return TRUE;
+}
+
+/* Initialize packet waiting */
+
+void *silc_packet_wait_init(SilcPacketStream stream,
+ const SilcID *source_id, ...)
+{
+ SilcPacketWait pw;
+ SilcBool ret;
+ va_list ap;
+
+ pw = silc_calloc(1, sizeof(*pw));
+ if (!pw)
+ return NULL;
+
+ /* Allocate mutex and conditional variable */
+ if (!silc_mutex_alloc(&pw->wait_lock)) {
+ silc_free(pw);
+ return NULL;
+ }
+ if (!silc_cond_alloc(&pw->wait_cond)) {
+ silc_mutex_free(pw->wait_lock);
+ silc_free(pw);
+ return NULL;
+ }
+
+ /* Link to the packet stream for the requested packet types */
+ va_start(ap, source_id);
+ ret = silc_packet_stream_link_va(stream, &silc_packet_wait_cbs, pw,
+ 10000000, ap);
+ va_end(ap);
+ if (!ret) {
+ silc_cond_free(pw->wait_cond);
+ silc_mutex_free(pw->wait_lock);
+ silc_free(pw);
+ return NULL;
+ }
+
+ /* Initialize packet queue */
+ silc_list_init(pw->packet_queue, struct SilcPacketStruct, next);
+
+ if (source_id) {
+ SilcUInt32 id_len;
+ silc_id_id2str(SILC_ID_GET_ID(*source_id), source_id->type, pw->id,
+ sizeof(pw->id), &id_len);
+ pw->id_type = source_id->type;
+ pw->id_len = id_len;
+ }
+
+ return (void *)pw;
+}
+
+/* Uninitialize packet waiting */
+
+void silc_packet_wait_uninit(void *waiter, SilcPacketStream stream)
+{
+ SilcPacketWait pw = waiter;
+ SilcPacket packet;
+
+ /* Signal any threads to stop waiting */
+ silc_mutex_lock(pw->wait_lock);
+ pw->stopped = TRUE;
+ silc_cond_broadcast(pw->wait_cond);
+ silc_mutex_unlock(pw->wait_lock);
+ silc_thread_yield();
+
+ /* Re-acquire lock and free resources */
+ silc_mutex_lock(pw->wait_lock);
+ silc_packet_stream_unlink(stream, &silc_packet_wait_cbs, pw);
+
+ /* Free any remaining packets */
+ silc_list_start(pw->packet_queue);
+ while ((packet = silc_list_get(pw->packet_queue)) != SILC_LIST_END)
+ silc_packet_free(packet);
+
+ silc_mutex_unlock(pw->wait_lock);
+ silc_cond_free(pw->wait_cond);
+ silc_mutex_free(pw->wait_lock);
+ silc_free(pw);
+}
+
+/* Blocks thread until a packet has been received. */
+
+int silc_packet_wait(void *waiter, int timeout, SilcPacket *return_packet)
+{
+ SilcPacketWait pw = waiter;
+ SilcBool ret = FALSE;
+
+ silc_mutex_lock(pw->wait_lock);
+
+ /* Wait here until packet has arrived */
+ while (silc_list_count(pw->packet_queue) == 0) {
+ if (silc_unlikely(pw->stopped)) {
+ silc_mutex_unlock(pw->wait_lock);
+ return -1;
+ }
+ ret = silc_cond_timedwait(pw->wait_cond, pw->wait_lock, timeout);
+ }
+
+ /* Return packet */
+ silc_list_start(pw->packet_queue);
+ *return_packet = silc_list_get(pw->packet_queue);
+ silc_list_del(pw->packet_queue, *return_packet);
+
+ silc_mutex_unlock(pw->wait_lock);
+
+ return ret == TRUE ? 1 : 0;
+}
+
+/************************** Packet Stream Wrapper ***************************/
+
+/* Packet stream wrapper receive callback */
+static SilcBool
+silc_packet_wrap_packet_receive(SilcPacketEngine engine,
+ SilcPacketStream stream,
+ SilcPacket packet,
+ void *callback_context,
+ void *stream_context);
+
+const SilcStreamOps silc_packet_stream_ops;
+
+/* Packet stream wrapper context */
+typedef struct {
+ const SilcStreamOps *ops;
+ SilcPacketStream stream;
+ SilcMutex lock;
+ void *waiter; /* Waiter context in blocking mode */
+ SilcPacketWrapCoder coder;
+ void *coder_context;
+ SilcBuffer encbuf;
+ SilcStreamNotifier callback;
+ void *context;
+ SilcList in_queue;
+ SilcPacketType type;
+ SilcPacketFlags flags;
+ unsigned int closed : 1;
+ unsigned int blocking : 1;
+ unsigned int read_more : 1;
+} *SilcPacketWrapperStream;
+
+/* Packet wrapper callbacks */
+static const SilcPacketCallbacks silc_packet_wrap_cbs =
+{
+ silc_packet_wrap_packet_receive, NULL, NULL
+};
+
+/* Packet stream wrapper receive callback, non-blocking mode */
+
+static SilcBool
+silc_packet_wrap_packet_receive(SilcPacketEngine engine,
+ SilcPacketStream stream,
+ SilcPacket packet,
+ void *callback_context,
+ void *stream_context)
+{
+ SilcPacketWrapperStream pws = callback_context;
+
+ if (pws->closed || !pws->callback)
+ return FALSE;
+
+ silc_mutex_lock(pws->lock);
+ silc_list_add(pws->in_queue, packet);
+ silc_mutex_unlock(pws->lock);
+
+ /* Call notifier callback */
+ pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context);
+
+ return TRUE;
+}
+
+/* Task callback to notify more data is available for reading */
+
+SILC_TASK_CALLBACK(silc_packet_wrap_read_more)
+{
+ SilcPacketWrapperStream pws = context;
+
+ if (pws->closed || !pws->callback)
+ return;
+
+ /* Call notifier callback */
+ pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context);
+}
+
+/* Read SILC packet */
+
+int silc_packet_wrap_read(SilcStream stream, unsigned char *buf,
+ SilcUInt32 buf_len)
+{
+ SilcPacketWrapperStream pws = stream;
+ SilcPacket packet;
+ SilcBool read_more = FALSE;
+ int len;
+
+ if (pws->closed)
+ return -2;
+
+ if (pws->blocking) {
+ /* Block until packet is received */
+ if ((silc_packet_wait(pws->waiter, 0, &packet)) < 0)
+ return -2;
+ if (pws->closed)
+ return -2;
+ } else {
+ /* Non-blocking mode */
+ silc_mutex_lock(pws->lock);
+ if (!silc_list_count(pws->in_queue)) {
+ silc_mutex_unlock(pws->lock);
+ return -1;
+ }
+
+ silc_list_start(pws->in_queue);
+ packet = silc_list_get(pws->in_queue);
+ silc_list_del(pws->in_queue, packet);
+ silc_mutex_unlock(pws->lock);
+ }
+
+ /* Call decoder if set */
+ if (pws->coder && !pws->read_more)
+ pws->coder(stream, SILC_STREAM_CAN_READ, &packet->buffer,
+ pws->coder_context);
+
+ len = silc_buffer_len(&packet->buffer);
+ if (len > buf_len) {
+ len = buf_len;
+ read_more = TRUE;
+ }
+
+ /* Read data */
+ memcpy(buf, packet->buffer.data, len);
+
+ if (read_more && !pws->blocking) {
+ /* More data will be available (in blocking mode not supported). */
+ silc_buffer_pull(&packet->buffer, len);
+ silc_list_insert(pws->in_queue, NULL, packet);
+ silc_schedule_task_add_timeout(pws->stream->sc->schedule,
+ silc_packet_wrap_read_more, pws, 0, 0);
+ pws->read_more = TRUE;
+ return len;
+ }
+
+ pws->read_more = FALSE;
+ silc_packet_free(packet);
+ return len;
+}
+
+/* Write SILC packet */
+
+int silc_packet_wrap_write(SilcStream stream, const unsigned char *data,
+ SilcUInt32 data_len)
+{
+ SilcPacketWrapperStream pws = stream;
+ SilcBool ret = FALSE;
+
+ /* Call encoder if set */
+ if (pws->coder) {
+ silc_buffer_reset(pws->encbuf);
+ ret = pws->coder(stream, SILC_STREAM_CAN_WRITE, pws->encbuf,
+ pws->coder_context);
+ }
+
+ /* Send the SILC packet */
+ if (ret) {
+ if (!silc_packet_send_va(pws->stream, pws->type, pws->flags,
+ SILC_STR_DATA(silc_buffer_data(pws->encbuf),
+ silc_buffer_len(pws->encbuf)),
+ SILC_STR_DATA(data, data_len),
+ SILC_STR_END))
+ return -2;
+ } else {
+ if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len))
+ return -2;
+ }
+
+ return data_len;
+}
+
+/* Close stream */
+
+SilcBool silc_packet_wrap_close(SilcStream stream)
+{
+ SilcPacketWrapperStream pws = stream;
+
+ if (pws->closed)
+ return TRUE;
+
+ if (pws->blocking) {
+ /* Close packet waiter */
+ silc_packet_wait_uninit(pws->waiter, pws->stream);
+ } else {
+ /* Unlink */
+ if (pws->callback)
+ silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws);
+ }
+ pws->closed = TRUE;
+
+ return TRUE;
+}
+
+/* Destroy wrapper stream */
+
+void silc_packet_wrap_destroy(SilcStream stream)
+
+{
+ SilcPacketWrapperStream pws = stream;
+ SilcPacket packet;
+
+ SILC_LOG_DEBUG(("Destroying wrapped packet stream %p", pws));
+
+ silc_stream_close(stream);
+ silc_list_start(pws->in_queue);
+ while ((packet = silc_list_get(pws->in_queue)))
+ silc_packet_free(packet);
+ if (pws->lock)
+ silc_mutex_free(pws->lock);
+ if (pws->encbuf)
+ silc_buffer_free(pws->encbuf);
+ silc_packet_stream_unref(pws->stream);
+
+ silc_free(pws);
+}
+
+/* Link stream to receive packets */
+
+SilcBool silc_packet_wrap_notifier(SilcStream stream,
+ SilcSchedule schedule,
+ SilcStreamNotifier callback,
+ void *context)
+{
+ SilcPacketWrapperStream pws = stream;
+
+ if (pws->closed || pws->blocking)
+ return FALSE;
+
+ /* Link to receive packets */
+ if (callback)
+ silc_packet_stream_link(pws->stream, &silc_packet_wrap_cbs, pws,
+ 100000, pws->type, -1);
+ else
+ silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws);
+
+ pws->callback = callback;
+ pws->context = context;
+
+ return TRUE;
+}
+
+/* Return schedule */
+
+SilcSchedule silc_packet_wrap_get_schedule(SilcStream stream)
+{
+ return NULL;
+}
+
+/* Wraps packet stream into SilcStream. */
+
+SilcStream silc_packet_stream_wrap(SilcPacketStream stream,
+ SilcPacketType type,
+ SilcPacketFlags flags,
+ SilcBool blocking_mode,
+ SilcPacketWrapCoder coder,
+ void *context)
+{
+ SilcPacketWrapperStream pws;
+
+ pws = silc_calloc(1, sizeof(*pws));
+ if (!pws)
+ return NULL;
+
+ SILC_LOG_DEBUG(("Wrapping packet stream %p to stream %p", stream, pws));
+
+ pws->ops = &silc_packet_stream_ops;
+ pws->stream = stream;
+ pws->type = type;
+ pws->flags = flags;
+ pws->blocking = blocking_mode;
+ pws->coder = coder;
+ pws->coder_context = context;
+
+ /* Allocate small amount for encoder buffer. */
+ if (pws->coder)
+ pws->encbuf = silc_buffer_alloc(8);
+
+ if (pws->blocking) {
+ /* Blocking mode. Use packet waiter to do the thing. */
+ pws->waiter = silc_packet_wait_init(pws->stream, NULL, pws->type, -1);
+ if (!pws->waiter) {
+ silc_free(pws);
+ return NULL;
+ }
+ } else {
+ /* Non-blocking mode */
+ silc_mutex_alloc(&pws->lock);
+ silc_list_init(pws->in_queue, struct SilcPacketStruct, next);