X-Git-Url: http://git.silcnet.org/gitweb/?p=silc.git;a=blobdiff_plain;f=lib%2Fsilccore%2Fsilcpacket.c;h=614dc894650ce0e334051a508d1c052cef6e0d44;hp=55447f3d93a6bf571ed05e66fecda6e2fbdbf4b4;hb=3661cbc69ce24b5230c8602b24927eb841933b5e;hpb=690216574e05c9dcc1e78d6677d4cc82c3d8baa8 diff --git a/lib/silccore/silcpacket.c b/lib/silccore/silcpacket.c index 55447f3d..614dc894 100644 --- a/lib/silccore/silcpacket.c +++ b/lib/silccore/silcpacket.c @@ -4,7 +4,7 @@ Author: Pekka Riikonen - Copyright (C) 1997 - 2006 Pekka Riikonen + Copyright (C) 1997 - 2008 Pekka Riikonen This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -31,7 +31,7 @@ typedef struct { SilcSchedule schedule; /* The scheduler */ SilcPacketEngine engine; /* Packet engine */ - SilcBufferStruct inbuf; /* Data input buffer */ + SilcDList inbufs; /* Data inbut buffer list */ SilcUInt32 stream_count; /* Number of streams using this */ } *SilcPacketEngineContext; @@ -67,11 +67,12 @@ struct SilcPacketStreamStruct { struct SilcPacketStreamStruct *next; SilcPacketEngineContext sc; /* Per scheduler context */ SilcStream stream; /* Underlaying stream */ - SilcMutex lock; /* Stream lock */ + SilcMutex lock; /* Packet stream lock */ SilcDList process; /* Packet processors, or NULL */ SilcPacketRemoteUDP remote_udp; /* UDP remote stream tuple, or NULL */ void *stream_context; /* Stream context */ SilcBufferStruct outbuf; /* Out buffer */ + SilcBuffer inbuf; /* Inbuf from inbuf list or NULL */ SilcCipher send_key[2]; /* Sending key */ SilcHmac send_hmac[2]; /* Sending HMAC */ SilcCipher receive_key[2]; /* Receiving key */ @@ -291,13 +292,34 @@ static inline SilcBool silc_packet_stream_write(SilcPacketStream ps, static inline SilcBool silc_packet_stream_read(SilcPacketStream ps, SilcPacketStream *ret_ps) { - SilcStream stream; + SilcStream stream = ps->stream; SilcBuffer inbuf; SilcBool connected; int ret; - stream = ps->stream; - inbuf = &ps->sc->inbuf; + /* Get inbuf. If there is already some data for this stream in the buffer + we already have it. Otherwise get the current one from list, it will + include the data. */ + inbuf = ps->inbuf; + if (!inbuf) { + silc_dlist_start(ps->sc->inbufs); + inbuf = silc_dlist_get(ps->sc->inbufs); + if (!inbuf) { + /* Allocate new data input buffer */ + inbuf = silc_buffer_alloc(SILC_PACKET_DEFAULT_SIZE * 65); + if (!inbuf) { + silc_mutex_unlock(ps->lock); + return FALSE; + } + silc_buffer_reset(inbuf); + silc_dlist_add(ps->sc->inbufs, inbuf); + } + } + + /* Make sure there is enough room to read */ + if (SILC_PACKET_DEFAULT_SIZE * 2 > silc_buffer_taillen(inbuf)) + silc_buffer_realloc(inbuf, silc_buffer_truelen(inbuf) + + (SILC_PACKET_DEFAULT_SIZE * 2)); if (silc_socket_stream_is_udp(stream, &connected)) { if (!connected) { @@ -314,7 +336,6 @@ static inline SilcBool silc_packet_stream_read(SilcPacketStream ps, silc_mutex_unlock(ps->lock); if (ret == -1) { /* Cannot read now, do it later. */ - silc_buffer_pull(inbuf, silc_buffer_len(inbuf)); return FALSE; } @@ -372,7 +393,6 @@ static inline SilcBool silc_packet_stream_read(SilcPacketStream ps, if (ret == -1) { /* Cannot read now, do it later. */ - silc_buffer_pull(inbuf, silc_buffer_len(inbuf)); return FALSE; } @@ -402,7 +422,9 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status, switch (status) { case SILC_STREAM_CAN_READ: - SILC_LOG_DEBUG(("Reading data from stream")); + /* Reading is locked also with stream->lock because we may be reading + at the same time other thread is writing to same underlaying stream. */ + SILC_LOG_DEBUG(("Reading data from stream %p, ps %p", ps->stream, ps)); /* Read data from stream */ if (!silc_packet_stream_read(ps, &remote)) @@ -421,7 +443,8 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status, break; case SILC_STREAM_CAN_WRITE: - SILC_LOG_DEBUG(("Writing pending data to stream")); + SILC_LOG_DEBUG(("Writing pending data to stream %p, ps %p", + ps->stream, ps)); if (silc_unlikely(!silc_buffer_headlen(&ps->outbuf))) { silc_mutex_unlock(ps->lock); @@ -497,8 +520,16 @@ static void silc_packet_engine_context_destr(void *key, void *context, void *user_context) { SilcPacketEngineContext sc = context; - silc_buffer_clear(&sc->inbuf); - silc_buffer_purge(&sc->inbuf); + SilcBuffer buffer; + + silc_dlist_start(sc->inbufs); + while ((buffer = silc_dlist_get(sc->inbufs))) { + silc_buffer_clear(buffer); + silc_buffer_free(buffer); + silc_dlist_del(sc->inbufs, buffer); + } + + silc_dlist_uninit(sc->inbufs); silc_free(sc); } @@ -571,17 +602,79 @@ silc_packet_engine_start(SilcRng rng, SilcBool router, void silc_packet_engine_stop(SilcPacketEngine engine) { + SilcPacket packet; SILC_LOG_DEBUG(("Stopping packet engine")); if (!engine) return; - /* XXX */ + /* Free packet free list */ + silc_list_start(engine->packet_pool); + while ((packet = silc_list_get(engine->packet_pool))) { + silc_buffer_purge(&packet->buffer); + silc_free(packet); + } + silc_hash_table_free(engine->contexts); + silc_mutex_free(engine->lock); silc_free(engine); } +static const char * const packet_error[] = { + "Cannot read from stream", + "Cannot write to stream", + "Packet MAC failed", + "Packet decryption failed", + "Unknown SID", + "Packet is malformed", + "System out of memory", +}; + +/* Return packet error string */ + +const char *silc_packet_error_string(SilcPacketError error) +{ + if (error < SILC_PACKET_ERR_READ || error > SILC_PACKET_ERR_NO_MEMORY) + return ""; + return packet_error[error]; +} + +/* Return list of packet streams in the engine */ + +SilcDList silc_packet_engine_get_streams(SilcPacketEngine engine) +{ + SilcDList list; + SilcPacketStream ps; + + list = silc_dlist_init(); + if (!list) + return NULL; + + silc_mutex_lock(engine->lock); + silc_list_start(engine->streams); + while ((ps = silc_list_get(engine->streams))) { + silc_packet_stream_ref(ps); + silc_dlist_add(list, ps); + } + silc_mutex_unlock(engine->lock); + + return list; +} + +/* Free list returned by silc_packet_engine_get_streams */ + +void silc_packet_engine_free_streams_list(SilcDList streams) +{ + SilcPacketStream ps; + + silc_dlist_start(streams); + while ((ps = silc_dlist_get(streams))) + silc_packet_stream_unref(ps); + + silc_dlist_uninit(streams); +} + /* Create new packet stream */ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine, @@ -589,6 +682,7 @@ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine, SilcStream stream) { SilcPacketStream ps; + SilcBuffer inbuf; void *tmp; SILC_LOG_DEBUG(("Creating new packet stream")); @@ -627,32 +721,43 @@ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine, (void *)&ps->sc)) { ps->sc = silc_calloc(1, sizeof(*ps->sc)); if (!ps->sc) { - silc_packet_stream_destroy(ps); silc_mutex_unlock(engine->lock); + silc_packet_stream_destroy(ps); return NULL; } ps->sc->engine = engine; ps->sc->schedule = schedule; /* Allocate data input buffer */ - tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE * 31); - if (!tmp) { + inbuf = silc_buffer_alloc(SILC_PACKET_DEFAULT_SIZE * 65); + if (!inbuf) { silc_free(ps->sc); ps->sc = NULL; + silc_mutex_unlock(engine->lock); silc_packet_stream_destroy(ps); + return NULL; + } + silc_buffer_reset(inbuf); + + ps->sc->inbufs = silc_dlist_init(); + if (!ps->sc->inbufs) { + silc_buffer_free(inbuf); + silc_free(ps->sc); + ps->sc = NULL; silc_mutex_unlock(engine->lock); + silc_packet_stream_destroy(ps); return NULL; } - silc_buffer_set(&ps->sc->inbuf, tmp, SILC_PACKET_DEFAULT_SIZE * 31); - silc_buffer_reset(&ps->sc->inbuf); + silc_dlist_add(ps->sc->inbufs, inbuf); /* Add to per scheduler context hash table */ if (!silc_hash_table_add(engine->contexts, schedule, ps->sc)) { - silc_buffer_purge(&ps->sc->inbuf); + silc_buffer_free(inbuf); + silc_dlist_del(ps->sc->inbufs, inbuf); silc_free(ps->sc); ps->sc = NULL; - silc_packet_stream_destroy(ps); silc_mutex_unlock(engine->lock); + silc_packet_stream_destroy(ps); return NULL; } } @@ -667,10 +772,18 @@ SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine, silc_hash_string_compare, NULL, silc_packet_engine_hash_destr, NULL, TRUE); + silc_mutex_unlock(engine->lock); /* Set IO notifier callback. This schedules this stream for I/O. */ - silc_stream_set_notifier(ps->stream, schedule, silc_packet_stream_io, ps); + if (!silc_stream_set_notifier(ps->stream, schedule, + silc_packet_stream_io, ps)) { + SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream")); + silc_packet_stream_destroy(ps); + return NULL; + } + + SILC_LOG_DEBUG(("Created packet stream %p", ps)); return ps; } @@ -766,11 +879,21 @@ SilcPacketStream silc_packet_stream_add_remote(SilcPacketStream stream, void silc_packet_stream_destroy(SilcPacketStream stream) { + SilcPacketEngine engine; + if (!stream) return; - if (silc_atomic_get_int8(&stream->refcnt) > 1) { + if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) { + if (stream->destroyed) + return; stream->destroyed = TRUE; + + SILC_LOG_DEBUG(("Marking packet stream %p destroyed", stream)); + + /* Close the underlaying stream */ + if (!stream->udp && stream->stream) + silc_stream_close(stream->stream); return; } @@ -778,17 +901,18 @@ void silc_packet_stream_destroy(SilcPacketStream stream) if (!stream->udp) { /* Delete from engine */ - silc_mutex_lock(stream->sc->engine->lock); - silc_list_del(stream->sc->engine->streams, stream); - - /* Remove per scheduler context, if it is not used anymore */ if (stream->sc) { + engine = stream->sc->engine; + silc_mutex_lock(engine->lock); + silc_list_del(engine->streams, stream); + + /* Remove per scheduler context, if it is not used anymore */ stream->sc->stream_count--; if (!stream->sc->stream_count) - silc_hash_table_del(stream->sc->engine->contexts, - stream->sc->schedule); + silc_hash_table_del(engine->contexts, stream->sc->schedule); + + silc_mutex_unlock(engine->lock); } - silc_mutex_unlock(stream->sc->engine->lock); /* Destroy the underlaying stream */ if (stream->stream) @@ -796,11 +920,13 @@ void silc_packet_stream_destroy(SilcPacketStream stream) } else { /* Delete from UDP remote hash table */ char tuple[64]; - silc_snprintf(tuple, sizeof(tuple), "%d%s", stream->remote_udp->remote_port, - stream->remote_udp->remote_ip); - silc_mutex_lock(stream->sc->engine->lock); - silc_hash_table_del(stream->sc->engine->udp_remote, tuple); - silc_mutex_unlock(stream->sc->engine->lock); + engine = stream->sc->engine; + silc_snprintf(tuple, sizeof(tuple), "%d%s", + stream->remote_udp->remote_port, + stream->remote_udp->remote_ip); + silc_mutex_lock(engine->lock); + silc_hash_table_del(engine->udp_remote, tuple); + silc_mutex_unlock(engine->lock); silc_free(stream->remote_udp->remote_ip); silc_free(stream->remote_udp); @@ -824,13 +950,40 @@ void silc_packet_stream_destroy(SilcPacketStream stream) silc_dlist_uninit(stream->process); } - /* XXX */ + /* Destroy ciphers and HMACs */ + if (stream->send_key[0]) + silc_cipher_free(stream->send_key[0]); + if (stream->receive_key[0]) + silc_cipher_free(stream->receive_key[0]); + if (stream->send_hmac[0]) + silc_hmac_free(stream->send_hmac[0]); + if (stream->receive_hmac[0]) + silc_hmac_free(stream->receive_hmac[0]); + if (stream->send_key[1]) + silc_cipher_free(stream->send_key[1]); + if (stream->receive_key[1]) + silc_cipher_free(stream->receive_key[1]); + if (stream->send_hmac[1]) + silc_hmac_free(stream->send_hmac[1]); + if (stream->receive_hmac[1]) + silc_hmac_free(stream->receive_hmac[1]); + + /* Free IDs */ + silc_free(stream->src_id); + silc_free(stream->dst_id); silc_atomic_uninit8(&stream->refcnt); silc_mutex_free(stream->lock); silc_free(stream); } +/* Return TRUE if the stream is valid */ + +SilcBool silc_packet_stream_is_valid(SilcPacketStream stream) +{ + return stream->destroyed == FALSE; +} + /* Marks as router stream */ void silc_packet_stream_set_router(SilcPacketStream stream) @@ -877,6 +1030,7 @@ static SilcBool silc_packet_stream_link_va(SilcPacketStream stream, stream->process = silc_dlist_init(); if (!stream->process) { silc_mutex_unlock(stream->lock); + silc_free(p); return FALSE; } } @@ -1001,14 +1155,22 @@ SilcBool silc_packet_get_sender(SilcPacket packet, void silc_packet_stream_ref(SilcPacketStream stream) { silc_atomic_add_int8(&stream->refcnt, 1); + SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream, + silc_atomic_get_int8(&stream->refcnt) - 1, + silc_atomic_get_int8(&stream->refcnt))); } /* Unreference packet stream */ void silc_packet_stream_unref(SilcPacketStream stream) { - if (silc_atomic_sub_int8(&stream->refcnt, 1) == 0) - silc_packet_stream_destroy(stream); + SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream, + silc_atomic_get_int8(&stream->refcnt), + silc_atomic_get_int8(&stream->refcnt) - 1)); + if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) + return; + silc_atomic_add_int8(&stream->refcnt, 1); + silc_packet_stream_destroy(stream); } /* Return engine */ @@ -1104,7 +1266,7 @@ SilcBool silc_packet_set_keys(SilcPacketStream stream, SilcCipher send_key, } else { if (stream->send_key[0] && send_key) silc_cipher_free(stream->send_key[0]); - if (stream->send_key[1] && receive_key) + if (stream->receive_key[0] && receive_key) silc_cipher_free(stream->receive_key[0]); if (stream->send_hmac[0] && send_hmac) silc_hmac_free(stream->send_hmac[0]); @@ -1166,12 +1328,13 @@ SilcBool silc_packet_set_ids(SilcPacketStream stream, if (!src_id && !dst_id) return FALSE; - SILC_LOG_DEBUG(("Setting new IDs to packet stream")); - silc_mutex_lock(stream->lock); if (src_id) { + SILC_LOG_DEBUG(("Setting source ID to packet stream %p", stream)); + silc_free(stream->src_id); + stream->src_id = NULL; if (!silc_id_id2str(src_id, src_id_type, tmp, sizeof(tmp), &len)) { silc_mutex_unlock(stream->lock); return FALSE; @@ -1186,7 +1349,10 @@ SilcBool silc_packet_set_ids(SilcPacketStream stream, } if (dst_id) { + SILC_LOG_DEBUG(("Setting destination ID to packet stream %p", stream)); + silc_free(stream->dst_id); + stream->dst_id = NULL; if (!silc_id_id2str(dst_id, dst_id_type, tmp, sizeof(tmp), &len)) { silc_mutex_unlock(stream->lock); return FALSE; @@ -1205,6 +1371,31 @@ SilcBool silc_packet_set_ids(SilcPacketStream stream, return TRUE; } +/* Return IDs from the packet stream */ + +SilcBool silc_packet_get_ids(SilcPacketStream stream, + SilcBool *src_id_set, SilcID *src_id, + SilcBool *dst_id_set, SilcID *dst_id) +{ + if (src_id && stream->src_id) + if (!silc_id_str2id2(stream->src_id, stream->src_id_len, + stream->src_id_type, src_id)) + return FALSE; + + if (stream->src_id && src_id_set) + *src_id_set = TRUE; + + if (dst_id && stream->dst_id) + if (!silc_id_str2id2(stream->dst_id, stream->dst_id_len, + stream->dst_id_type, dst_id)) + return FALSE; + + if (stream->dst_id && dst_id_set) + *dst_id_set = TRUE; + + return TRUE; +} + /* Adds Security ID (SID) */ SilcBool silc_packet_set_sid(SilcPacketStream stream, SilcUInt8 sid) @@ -1282,12 +1473,7 @@ static inline void silc_packet_send_ctr_increment(SilcPacketStream stream, unsigned char *ret_iv) { unsigned char *iv = silc_cipher_get_iv(cipher); - SilcUInt32 pc; - - /* Increment packet counter */ - SILC_GET32_MSB(pc, iv + 8); - pc++; - SILC_PUT32_MSB(pc, iv + 8); + SilcUInt32 pc1, pc2; /* Reset block counter */ memset(iv + 12, 0, 4); @@ -1299,11 +1485,24 @@ static inline void silc_packet_send_ctr_increment(SilcPacketStream stream, ret_iv[1] = ret_iv[0] + iv[4]; ret_iv[2] = ret_iv[0] ^ ret_iv[1]; ret_iv[3] = ret_iv[0] + ret_iv[2]; - SILC_PUT32_MSB(pc, ret_iv + 4); + + /* Increment 32-bit packet counter */ + SILC_GET32_MSB(pc1, iv + 8); + pc1++; + SILC_PUT32_MSB(pc1, ret_iv + 4); + SILC_LOG_HEXDUMP(("IV"), ret_iv, 8); /* Set new nonce to counter block */ - memcpy(iv + 4, ret_iv, 4); + memcpy(iv + 4, ret_iv, 8); + } else { + /* Increment 64-bit packet counter */ + SILC_GET32_MSB(pc1, iv + 4); + SILC_GET32_MSB(pc2, iv + 8); + if (++pc2 == 0) + ++pc1; + SILC_PUT32_MSB(pc1, iv + 4); + SILC_PUT32_MSB(pc2, iv + 8); } SILC_LOG_HEXDUMP(("Counter Block"), iv, 16); @@ -1370,10 +1569,8 @@ static inline SilcBool silc_packet_send_raw(SilcPacketStream stream, type and flags, and calculate correct length. Private messages with private keys and channel messages are special packets as their payload is encrypted already. */ - if ((type == SILC_PACKET_PRIVATE_MESSAGE && - flags & SILC_PACKET_FLAG_PRIVMSG_KEY) || - type == SILC_PACKET_CHANNEL_MESSAGE) { - + if (type == SILC_PACKET_PRIVATE_MESSAGE && + flags & SILC_PACKET_FLAG_PRIVMSG_KEY) { /* Padding is calculated from header + IDs */ if (!ctr) SILC_PACKET_PADLEN((SILC_PACKET_HEADER_LEN + src_id_len + dst_id_len + @@ -1382,8 +1579,26 @@ static inline SilcBool silc_packet_send_raw(SilcPacketStream stream, /* Length to encrypt, header + IDs + padding. */ enclen = (SILC_PACKET_HEADER_LEN + src_id_len + dst_id_len + padlen + psnlen); - } else { + } else if (type == SILC_PACKET_CHANNEL_MESSAGE) { + if (stream->sc->engine->local_is_router && stream->is_router) { + /* Channel messages between routers are encrypted as normal packets. + Padding is calculated from true length of the packet. */ + if (!ctr) + SILC_PACKET_PADLEN(truelen + psnlen, block_len, padlen); + + enclen += padlen + psnlen; + } else { + /* Padding is calculated from header + IDs */ + if (!ctr) + SILC_PACKET_PADLEN((SILC_PACKET_HEADER_LEN + src_id_len + dst_id_len + + psnlen), block_len, padlen); + + /* Length to encrypt, header + IDs + padding. */ + enclen = (SILC_PACKET_HEADER_LEN + src_id_len + dst_id_len + + padlen + psnlen); + } + } else { /* Padding is calculated from true length of the packet */ if (flags & SILC_PACKET_FLAG_LONG_PAD) SILC_PACKET_PADLEN_MAX(truelen + psnlen, block_len, padlen); @@ -1441,6 +1656,7 @@ static inline SilcBool silc_packet_send_raw(SilcPacketStream stream, /* Encrypt the packet */ if (silc_likely(cipher)) { SILC_LOG_DEBUG(("Encrypting packet")); + silc_cipher_set_iv(cipher, NULL); if (silc_unlikely(!silc_cipher_encrypt(cipher, packet.data + ivlen, packet.data + ivlen, enclen, NULL))) { @@ -1631,16 +1847,19 @@ static inline void silc_packet_receive_ctr_increment(SilcPacketStream stream, unsigned char *iv, unsigned char *packet_iv) { - SilcUInt32 pc; + SilcUInt32 pc1, pc2; /* If IV Included flag, set the IV from packet to block counter. */ if (stream->iv_included) { memcpy(iv + 4, packet_iv, 8); } else { - /* Increment packet counter */ - SILC_GET32_MSB(pc, iv + 8); - pc++; - SILC_PUT32_MSB(pc, iv + 8); + /* Increment 64-bit packet counter. */ + SILC_GET32_MSB(pc1, iv + 4); + SILC_GET32_MSB(pc2, iv + 8); + if (++pc2 == 0) + ++pc1; + SILC_PUT32_MSB(pc1, iv + 4); + SILC_PUT32_MSB(pc2, iv + 8); } /* Reset block counter */ @@ -1766,8 +1985,8 @@ static inline SilcBool silc_packet_parse(SilcPacket packet) silc_buffer_len(buffer)), buffer->head, silc_buffer_headlen(buffer) + silc_buffer_len(buffer)); - SILC_LOG_DEBUG(("Incoming packet type: %d (%s)", packet->type, - silc_get_packet_name(packet->type))); + SILC_LOG_DEBUG(("Incoming packet type: %d (%s), flags %d", packet->type, + silc_get_packet_name(packet->type), packet->flags)); return TRUE; } @@ -1871,7 +2090,7 @@ static SilcBool silc_packet_dispatch(SilcPacket packet) static void silc_packet_read_process(SilcPacketStream stream) { - SilcBuffer inbuf = &stream->sc->inbuf; + SilcBuffer inbuf; SilcCipher cipher; SilcHmac hmac; SilcPacket packet; @@ -1883,6 +2102,15 @@ static void silc_packet_read_process(SilcPacketStream stream) SilcBool normal; int ret; + /* Get inbuf. If there is already some data for this stream in the buffer + we already have it. Otherwise get the current one from list, it will + include the data. */ + inbuf = stream->inbuf; + if (!inbuf) { + silc_dlist_start(stream->sc->inbufs); + inbuf = silc_dlist_get(stream->sc->inbufs); + } + /* Parse the packets from the data */ while (silc_buffer_len(inbuf) > 0) { ivlen = psnlen = 0; @@ -1894,6 +2122,8 @@ static void silc_packet_read_process(SilcPacketStream stream) (stream->iv_included ? SILC_PACKET_MIN_HEADER_LEN_IV : SILC_PACKET_MIN_HEADER_LEN))) { SILC_LOG_DEBUG(("Partial packet in queue, waiting for the rest")); + silc_dlist_del(stream->sc->inbufs, inbuf); + stream->inbuf = inbuf; return; } @@ -1936,8 +2166,7 @@ static void silc_packet_read_process(SilcPacketStream stream) silc_mutex_unlock(stream->lock); SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_UNKNOWN_SID); silc_mutex_lock(stream->lock); - silc_buffer_reset(inbuf); - return; + goto out; } } } else { @@ -1948,8 +2177,9 @@ static void silc_packet_read_process(SilcPacketStream stream) silc_packet_receive_ctr_increment(stream, iv, NULL); } - silc_cipher_decrypt(cipher, inbuf->data + ivlen, tmp, - block_len, iv); + if (silc_cipher_get_mode(cipher) == SILC_CIPHER_MODE_CTR) + silc_cipher_set_iv(cipher, NULL); + silc_cipher_decrypt(cipher, inbuf->data + ivlen, tmp, block_len, iv); header = tmp; if (stream->iv_included) { @@ -1974,8 +2204,7 @@ static void silc_packet_read_process(SilcPacketStream stream) SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MALFORMED); silc_mutex_lock(stream->lock); memset(tmp, 0, sizeof(tmp)); - silc_buffer_reset(inbuf); - return; + goto out; } if (silc_buffer_len(inbuf) < paddedlen + ivlen + mac_len) { @@ -1983,6 +2212,8 @@ static void silc_packet_read_process(SilcPacketStream stream) "(%d bytes)", paddedlen + mac_len - silc_buffer_len(inbuf))); memset(tmp, 0, sizeof(tmp)); + silc_dlist_del(stream->sc->inbufs, inbuf); + stream->inbuf = inbuf; return; } @@ -1996,8 +2227,7 @@ static void silc_packet_read_process(SilcPacketStream stream) SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MAC_FAILED); silc_mutex_lock(stream->lock); memset(tmp, 0, sizeof(tmp)); - silc_buffer_reset(inbuf); - return; + goto out; } /* Get packet */ @@ -2007,8 +2237,7 @@ static void silc_packet_read_process(SilcPacketStream stream) SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_NO_MEMORY); silc_mutex_lock(stream->lock); memset(tmp, 0, sizeof(tmp)); - silc_buffer_reset(inbuf); - return; + goto out; } packet->stream = stream; @@ -2023,8 +2252,7 @@ static void silc_packet_read_process(SilcPacketStream stream) silc_mutex_lock(stream->lock); silc_packet_free(packet); memset(tmp, 0, sizeof(tmp)); - silc_buffer_reset(inbuf); - return; + goto out; } } @@ -2069,7 +2297,7 @@ static void silc_packet_read_process(SilcPacketStream stream) silc_mutex_lock(stream->lock); silc_packet_free(packet); memset(tmp, 0, sizeof(tmp)); - return; + goto out; } stream->receive_psn++; @@ -2086,7 +2314,7 @@ static void silc_packet_read_process(SilcPacketStream stream) silc_mutex_lock(stream->lock); silc_packet_free(packet); memset(tmp, 0, sizeof(tmp)); - return; + goto out; } /* Dispatch the packet to application */ @@ -2094,6 +2322,13 @@ static void silc_packet_read_process(SilcPacketStream stream) break; } + out: + /* Add inbuf back to free list, if we owned it. */ + if (stream->inbuf) { + silc_dlist_add(stream->sc->inbufs, inbuf); + stream->inbuf = NULL; + } + silc_buffer_reset(inbuf); } @@ -2118,6 +2353,9 @@ typedef struct { SilcMutex wait_lock; SilcCond wait_cond; SilcList packet_queue; + unsigned char id[28]; + unsigned int id_type : 2; + unsigned int id_len : 5; unsigned int stopped : 1; } *SilcPacketWait; @@ -2132,6 +2370,13 @@ silc_packet_wait_packet_receive(SilcPacketEngine engine, { SilcPacketWait pw = callback_context; + /* If source ID is specified check for it */ + if (pw->id_len) { + if (pw->id_type != packet->src_id_type || + memcmp(pw->id, packet->src_id, pw->id_len)) + return FALSE; + } + /* Signal the waiting thread for a new packet */ silc_mutex_lock(pw->wait_lock); @@ -2150,7 +2395,8 @@ silc_packet_wait_packet_receive(SilcPacketEngine engine, /* Initialize packet waiting */ -void *silc_packet_wait_init(SilcPacketStream stream, ...) +void *silc_packet_wait_init(SilcPacketStream stream, + const SilcID *source_id, ...) { SilcPacketWait pw; SilcBool ret; @@ -2172,7 +2418,7 @@ void *silc_packet_wait_init(SilcPacketStream stream, ...) } /* Link to the packet stream for the requested packet types */ - va_start(ap, stream); + va_start(ap, source_id); ret = silc_packet_stream_link_va(stream, &silc_packet_wait_cbs, pw, 10000000, ap); va_end(ap); @@ -2186,6 +2432,14 @@ void *silc_packet_wait_init(SilcPacketStream stream, ...) /* Initialize packet queue */ silc_list_init(pw->packet_queue, struct SilcPacketStruct, next); + if (source_id) { + SilcUInt32 id_len; + silc_id_id2str(SILC_ID_GET_ID(*source_id), source_id->type, pw->id, + sizeof(pw->id), &id_len); + pw->id_type = source_id->type; + pw->id_len = id_len; + } + return (void *)pw; } @@ -2201,6 +2455,7 @@ void silc_packet_wait_uninit(void *waiter, SilcPacketStream stream) pw->stopped = TRUE; silc_cond_broadcast(pw->wait_cond); silc_mutex_unlock(pw->wait_lock); + silc_thread_yield(); /* Re-acquire lock and free resources */ silc_mutex_lock(pw->wait_lock); @@ -2261,12 +2516,19 @@ const SilcStreamOps silc_packet_stream_ops; typedef struct { const SilcStreamOps *ops; SilcPacketStream stream; + SilcMutex lock; + void *waiter; /* Waiter context in blocking mode */ + SilcPacketWrapCoder coder; + void *coder_context; + SilcBuffer encbuf; SilcStreamNotifier callback; void *context; SilcList in_queue; SilcPacketType type; SilcPacketFlags flags; unsigned int closed : 1; + unsigned int blocking : 1; + unsigned int read_more : 1; } *SilcPacketWrapperStream; /* Packet wrapper callbacks */ @@ -2275,7 +2537,7 @@ static SilcPacketCallbacks silc_packet_wrap_cbs = silc_packet_wrap_packet_receive, NULL, NULL }; -/* Packet stream wrapper receive callback */ +/* Packet stream wrapper receive callback, non-blocking mode */ static SilcBool silc_packet_wrap_packet_receive(SilcPacketEngine engine, @@ -2286,10 +2548,12 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine, { SilcPacketWrapperStream pws = callback_context; - if (!pws->closed || !pws->callback) + if (pws->closed || !pws->callback) return FALSE; + silc_mutex_lock(pws->lock); silc_list_add(pws->in_queue, packet); + silc_mutex_unlock(pws->lock); /* Call notifier callback */ pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context); @@ -2297,6 +2561,19 @@ silc_packet_wrap_packet_receive(SilcPacketEngine engine, return TRUE; } +/* Task callback to notify more data is available for reading */ + +SILC_TASK_CALLBACK(silc_packet_wrap_read_more) +{ + SilcPacketWrapperStream pws = context; + + if (pws->closed || !pws->callback) + return; + + /* Call notifier callback */ + pws->callback((SilcStream)pws, SILC_STREAM_CAN_READ, pws->context); +} + /* Read SILC packet */ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf, @@ -2304,23 +2581,57 @@ int silc_packet_wrap_read(SilcStream stream, unsigned char *buf, { SilcPacketWrapperStream pws = stream; SilcPacket packet; + SilcBool read_more = FALSE; int len; if (pws->closed) return -2; - if (!silc_list_count(pws->in_queue)) - return -1; - silc_list_start(pws->in_queue); - packet = silc_list_get(pws->in_queue); - silc_list_del(pws->in_queue, packet); + if (pws->blocking) { + /* Block until packet is received */ + if ((silc_packet_wait(pws->waiter, 0, &packet)) < 0) + return -2; + if (pws->closed) + return -2; + } else { + /* Non-blocking mode */ + silc_mutex_lock(pws->lock); + if (!silc_list_count(pws->in_queue)) { + silc_mutex_unlock(pws->lock); + return -1; + } + + silc_list_start(pws->in_queue); + packet = silc_list_get(pws->in_queue); + silc_list_del(pws->in_queue, packet); + silc_mutex_unlock(pws->lock); + } + + /* Call decoder if set */ + if (pws->coder && !pws->read_more) + pws->coder(stream, SILC_STREAM_CAN_READ, &packet->buffer, + pws->coder_context); len = silc_buffer_len(&packet->buffer); - if (len > buf_len) + if (len > buf_len) { len = buf_len; + read_more = TRUE; + } + /* Read data */ memcpy(buf, packet->buffer.data, len); + if (read_more && !pws->blocking) { + /* More data will be available (in blocking mode not supported). */ + silc_buffer_pull(&packet->buffer, len); + silc_list_insert(pws->in_queue, NULL, packet); + silc_schedule_task_add_timeout(pws->stream->sc->schedule, + silc_packet_wrap_read_more, pws, 0, 0); + pws->read_more = TRUE; + return len; + } + + pws->read_more = FALSE; silc_packet_free(packet); return len; } @@ -2331,10 +2642,27 @@ int silc_packet_wrap_write(SilcStream stream, const unsigned char *data, SilcUInt32 data_len) { SilcPacketWrapperStream pws = stream; + SilcBool ret = FALSE; + + /* Call encoder if set */ + if (pws->coder) { + silc_buffer_reset(pws->encbuf); + ret = pws->coder(stream, SILC_STREAM_CAN_WRITE, pws->encbuf, + pws->coder_context); + } /* Send the SILC packet */ - if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len)) - return -2; + if (ret) { + if (!silc_packet_send_va(pws->stream, pws->type, pws->flags, + SILC_STR_DATA(silc_buffer_data(pws->encbuf), + silc_buffer_len(pws->encbuf)), + SILC_STR_DATA(data, data_len), + SILC_STR_END)) + return -2; + } else { + if (!silc_packet_send(pws->stream, pws->type, pws->flags, data, data_len)) + return -2; + } return data_len; } @@ -2348,9 +2676,14 @@ SilcBool silc_packet_wrap_close(SilcStream stream) if (pws->closed) return TRUE; - /* Unlink */ - if (pws->callback) - silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws); + if (pws->blocking) { + /* Close packet waiter */ + silc_packet_wait_uninit(pws->waiter, pws->stream); + } else { + /* Unlink */ + if (pws->callback) + silc_packet_stream_unlink(pws->stream, &silc_packet_wrap_cbs, pws); + } pws->closed = TRUE; return TRUE; @@ -2370,6 +2703,10 @@ void silc_packet_wrap_destroy(SilcStream stream) silc_list_start(pws->in_queue); while ((packet = silc_list_get(pws->in_queue))) silc_packet_free(packet); + if (pws->lock) + silc_mutex_free(pws->lock); + if (pws->encbuf) + silc_buffer_free(pws->encbuf); silc_packet_stream_unref(pws->stream); silc_free(pws); @@ -2377,15 +2714,15 @@ void silc_packet_wrap_destroy(SilcStream stream) /* Link stream to receive packets */ -void silc_packet_wrap_notifier(SilcStream stream, - SilcSchedule schedule, - SilcStreamNotifier callback, - void *context) +SilcBool silc_packet_wrap_notifier(SilcStream stream, + SilcSchedule schedule, + SilcStreamNotifier callback, + void *context) { SilcPacketWrapperStream pws = stream; - if (pws->closed) - return; + if (pws->closed || pws->blocking) + return FALSE; /* Link to receive packets */ if (callback) @@ -2396,6 +2733,8 @@ void silc_packet_wrap_notifier(SilcStream stream, pws->callback = callback; pws->context = context; + + return TRUE; } /* Return schedule */ @@ -2409,7 +2748,10 @@ SilcSchedule silc_packet_wrap_get_schedule(SilcStream stream) SilcStream silc_packet_stream_wrap(SilcPacketStream stream, SilcPacketType type, - SilcPacketFlags flags) + SilcPacketFlags flags, + SilcBool blocking_mode, + SilcPacketWrapCoder coder, + void *context) { SilcPacketWrapperStream pws; @@ -2423,8 +2765,27 @@ SilcStream silc_packet_stream_wrap(SilcPacketStream stream, pws->stream = stream; pws->type = type; pws->flags = flags; + pws->blocking = blocking_mode; + pws->coder = coder; + pws->coder_context = context; + + /* Allocate small amount for encoder buffer. */ + if (pws->coder) + pws->encbuf = silc_buffer_alloc(8); + + if (pws->blocking) { + /* Blocking mode. Use packet waiter to do the thing. */ + pws->waiter = silc_packet_wait_init(pws->stream, NULL, pws->type, -1); + if (!pws->waiter) { + silc_free(pws); + return NULL; + } + } else { + /* Non-blocking mode */ + silc_mutex_alloc(&pws->lock); + silc_list_init(pws->in_queue, struct SilcPacketStruct, next); + } - silc_list_init(pws->in_queue, struct SilcPacketStruct, next); silc_packet_stream_ref(stream); return (SilcStream)pws;