+ if (silc_unlikely(i == -2)) {
+ /* Error */
+ silc_buffer_reset(&ps->outbuf);
+ silc_mutex_unlock(ps->lock);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_WRITE);
+ return FALSE;
+ }
+
+ if (silc_unlikely(i == -1)) {
+ /* Cannot write now, write later. */
+ if (!no_unlock)
+ silc_mutex_unlock(ps->lock);
+ return TRUE;
+ }
+
+ /* Wrote data */
+ silc_buffer_pull(&ps->outbuf, i);
+ }
+
+ silc_buffer_reset(&ps->outbuf);
+ if (!no_unlock)
+ silc_mutex_unlock(ps->lock);
+
+ return TRUE;
+}
+
+/* Reads data from stream. Must be called with ps->lock locked. If this
+ returns FALSE the lock has been unlocked. If this returns packet stream
+ to `ret_ps' its lock has been acquired and `ps' lock has been unlocked.
+ It is returned if the stream is UDP and remote UDP stream exists for
+ the sender of the packet. */
+
+static inline SilcBool silc_packet_stream_read(SilcPacketStream ps,
+ SilcPacketStream *ret_ps)
+{
+ SilcStream stream = ps->stream;
+ SilcBuffer inbuf;
+ SilcBool connected;
+ int ret;
+
+ /* Get inbuf. If there is already some data for this stream in the buffer
+ we already have it. Otherwise get the current one from list, it will
+ include the data. */
+ inbuf = ps->inbuf;
+ if (!inbuf) {
+ silc_dlist_start(ps->sc->inbufs);
+ inbuf = silc_dlist_get(ps->sc->inbufs);
+ if (!inbuf) {
+ /* Allocate new data input buffer */
+ inbuf = silc_buffer_alloc(SILC_PACKET_DEFAULT_SIZE * 65);
+ if (!inbuf) {
+ silc_mutex_unlock(ps->lock);
+ return FALSE;
+ }
+ silc_buffer_reset(inbuf);
+ silc_dlist_add(ps->sc->inbufs, inbuf);
+ }
+ }
+
+ /* Make sure there is enough room to read */
+ if (SILC_PACKET_DEFAULT_SIZE * 2 > silc_buffer_taillen(inbuf))
+ silc_buffer_realloc(inbuf, silc_buffer_truelen(inbuf) +
+ (SILC_PACKET_DEFAULT_SIZE * 2));
+
+ if (silc_socket_stream_is_udp(stream, &connected)) {
+ if (!connected) {
+ /* Connectionless UDP stream, read one UDP packet */
+ char remote_ip[64], tuple[64];
+ int remote_port;
+ SilcPacketStream remote;
+
+ ret = silc_net_udp_receive(stream, remote_ip, sizeof(remote_ip),
+ &remote_port, inbuf->tail,
+ silc_buffer_taillen(inbuf));
+
+ if (silc_unlikely(ret < 0)) {
+ silc_mutex_unlock(ps->lock);
+ if (ret == -1) {
+ /* Cannot read now, do it later. */
+ return FALSE;
+ }
+
+ /* Error */
+ silc_buffer_reset(inbuf);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
+ return FALSE;
+ }
+
+ /* See if remote packet stream exist for this sender */
+ silc_snprintf(tuple, sizeof(tuple), "%d%s", remote_port, remote_ip);
+ silc_mutex_lock(ps->sc->engine->lock);
+ if (silc_hash_table_find(ps->sc->engine->udp_remote, tuple, NULL,
+ (void *)&remote)) {
+ silc_mutex_unlock(ps->sc->engine->lock);
+ SILC_LOG_DEBUG(("UDP packet from %s:%d for stream %p", remote_ip,
+ remote_port, remote));
+ silc_mutex_unlock(ps->lock);
+ silc_mutex_lock(remote->lock);
+ *ret_ps = remote;
+ return TRUE;
+ }
+ silc_mutex_unlock(ps->sc->engine->lock);
+
+ /* Unknown sender */
+ if (!ps->remote_udp) {
+ ps->remote_udp = silc_calloc(1, sizeof(*ps->remote_udp));
+ if (silc_unlikely(!ps->remote_udp)) {
+ silc_mutex_unlock(ps->lock);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_NO_MEMORY);
+ return FALSE;
+ }
+ }
+
+ /* Save sender IP and port */
+ silc_free(ps->remote_udp->remote_ip);
+ ps->remote_udp->remote_ip = strdup(remote_ip);
+ ps->remote_udp->remote_port = remote_port;
+
+ silc_buffer_pull_tail(inbuf, ret);
+ return TRUE;
+ }
+ }
+
+ /* Read data from the stream */
+ ret = silc_stream_read(stream, inbuf->tail, silc_buffer_taillen(inbuf));
+ if (silc_unlikely(ret <= 0)) {
+ silc_mutex_unlock(ps->lock);
+ if (ret == 0) {
+ /* EOS */
+ silc_buffer_reset(inbuf);
+ SILC_PACKET_CALLBACK_EOS(ps);
+ return FALSE;
+ }
+
+ if (ret == -1) {
+ /* Cannot read now, do it later. */
+ return FALSE;
+ }
+
+ /* Error */
+ silc_buffer_reset(inbuf);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
+ return FALSE;
+ }
+
+ silc_buffer_pull_tail(inbuf, ret);
+ return TRUE;
+}
+
+/* Our stream IO notifier callback. */
+
+static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
+ void *context)
+{
+ SilcPacketStream remote = NULL, ps = context;
+
+ silc_mutex_lock(ps->lock);
+
+ if (silc_unlikely(ps->destroyed)) {
+ silc_mutex_unlock(ps->lock);
+ return;
+ }
+
+ switch (status) {
+ case SILC_STREAM_CAN_READ:
+ /* Reading is locked also with stream->lock because we may be reading
+ at the same time other thread is writing to same underlaying stream. */
+ SILC_LOG_DEBUG(("Reading data from stream %p, ps %p", ps->stream, ps));
+
+ /* Read data from stream */
+ if (!silc_packet_stream_read(ps, &remote))
+ return;
+
+ /* Now process the data */
+ silc_packet_stream_ref(ps);
+ if (!remote) {
+ silc_packet_read_process(ps);
+ silc_mutex_unlock(ps->lock);
+ } else {
+ silc_packet_read_process(remote);
+ silc_mutex_unlock(remote->lock);
+ }
+ silc_packet_stream_unref(ps);
+ break;
+
+ case SILC_STREAM_CAN_WRITE:
+ SILC_LOG_DEBUG(("Writing pending data to stream %p, ps %p",
+ ps->stream, ps));
+
+ if (silc_unlikely(!silc_buffer_headlen(&ps->outbuf))) {
+ silc_mutex_unlock(ps->lock);
+ return;
+ }
+
+ /* Write pending data to stream */
+ silc_packet_stream_write(ps, FALSE);
+ break;
+
+ default:
+ silc_mutex_unlock(ps->lock);
+ break;
+ }
+}
+
+/* Allocate packet */
+
+static SilcPacket silc_packet_alloc(SilcPacketEngine engine)
+{
+ SilcPacket packet;
+
+ SILC_LOG_DEBUG(("Packet pool count %d",
+ silc_list_count(engine->packet_pool)));
+
+ silc_mutex_lock(engine->lock);
+
+ /* Get packet from freelist or allocate new one. */
+ packet = silc_list_get(engine->packet_pool);
+ if (!packet) {
+ void *tmp;
+
+ silc_mutex_unlock(engine->lock);
+
+ packet = silc_calloc(1, sizeof(*packet));
+ if (silc_unlikely(!packet))
+ return NULL;
+
+ SILC_LOG_DEBUG(("Allocating new packet %p", packet));
+
+ tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
+ if (silc_unlikely(!tmp)) {
+ silc_free(packet);
+ return NULL;
+ }
+ silc_buffer_set(&packet->buffer, tmp, SILC_PACKET_DEFAULT_SIZE);
+ silc_buffer_reset(&packet->buffer);
+
+ return packet;
+ }
+
+ SILC_LOG_DEBUG(("Get packet %p", packet));
+
+ /* Delete from freelist */
+ silc_list_del(engine->packet_pool, packet);
+
+ silc_mutex_unlock(engine->lock);
+
+ return packet;
+}
+
+/* UDP remote stream hash table destructor */
+
+static void silc_packet_engine_hash_destr(void *key, void *context,
+ void *user_context)
+{
+ silc_free(key);
+}
+
+/* Per scheduler context hash table destructor */
+
+static void silc_packet_engine_context_destr(void *key, void *context,
+ void *user_context)
+{
+ SilcPacketEngineContext sc = context;
+ SilcBuffer buffer;
+
+ silc_dlist_start(sc->inbufs);
+ while ((buffer = silc_dlist_get(sc->inbufs))) {
+ silc_buffer_clear(buffer);
+ silc_buffer_free(buffer);
+ silc_dlist_del(sc->inbufs, buffer);
+ }
+
+ silc_dlist_uninit(sc->inbufs);
+ silc_free(sc);
+}
+
+
+/******************************** Packet API ********************************/