+#include "silc.h"
+
+/************************** Types and definitions ***************************/
+
+/* Per scheduler (which usually means per thread) data. We put per scheduler
+ data here for accessing without locking. SILC Schedule dictates that
+ tasks are dispatched in one thread, hence the per scheduler context. */
+typedef struct {
+ SilcSchedule schedule; /* The scheduler */
+ SilcPacketEngine engine; /* Packet engine */
+ SilcDList inbufs; /* Data inbut buffer list */
+ SilcUInt32 stream_count; /* Number of streams using this */
+} *SilcPacketEngineContext;
+
+/* Packet engine */
+struct SilcPacketEngineStruct {
+ SilcMutex lock; /* Engine lock */
+ SilcRng rng; /* RNG for engine */
+ SilcHashTable contexts; /* Per scheduler contexts */
+ const SilcPacketCallbacks *callbacks; /* Packet callbacks */
+ void *callback_context; /* Context for callbacks */
+ SilcList streams; /* All streams in engine */
+ SilcList packet_pool; /* Free list for received packets */
+ SilcHashTable udp_remote; /* UDP remote streams, or NULL */
+ unsigned int local_is_router : 1;
+};
+
+/* Packet processor context */
+typedef struct SilcPacketProcessStruct {
+ SilcPacketType *types; /* Packets to process */
+ const SilcPacketCallbacks *callbacks; /* Callbacks or NULL */
+ void *callback_context;
+ SilcInt32 priority; /* Priority */
+} *SilcPacketProcess;
+
+/* UDP remote stream tuple */
+typedef struct {
+ char *remote_ip; /* Remote IP address */
+ SilcUInt16 remote_port; /* Remote port */
+} *SilcPacketRemoteUDP;
+
+/* Packet stream */
+struct SilcPacketStreamStruct {
+ struct SilcPacketStreamStruct *next;
+ SilcPacketEngineContext sc; /* Per scheduler context */
+ SilcStream stream; /* Underlaying stream */
+ SilcMutex lock; /* Packet stream lock */
+ SilcDList process; /* Packet processors, or NULL */
+ SilcPacketRemoteUDP remote_udp; /* UDP remote stream tuple, or NULL */
+ void *stream_context; /* Stream context */
+ SilcBufferStruct outbuf; /* Out buffer */
+ SilcBuffer inbuf; /* Inbuf from inbuf list or NULL */
+ SilcCipher send_key[2]; /* Sending key */
+ SilcHmac send_hmac[2]; /* Sending HMAC */
+ SilcCipher receive_key[2]; /* Receiving key */
+ SilcHmac receive_hmac[2]; /* Receiving HMAC */
+ unsigned char *src_id; /* Source ID */
+ unsigned char *dst_id; /* Destination ID */
+ SilcUInt32 send_psn; /* Sending sequence */
+ SilcUInt32 receive_psn; /* Receiving sequence */
+ SilcAtomic8 refcnt; /* Reference counter */
+ SilcUInt8 sid; /* Security ID, set if IV included */
+ unsigned int src_id_len : 6;
+ unsigned int src_id_type : 2;
+ unsigned int dst_id_len : 6;
+ unsigned int dst_id_type : 2;
+ unsigned int is_router : 1; /* Set if router stream */
+ unsigned int destroyed : 1; /* Set if destroyed */
+ unsigned int iv_included : 1; /* Set if IV included */
+ unsigned int udp : 1; /* UDP remote stream */
+};
+
+/* Initial size of stream buffers */
+#define SILC_PACKET_DEFAULT_SIZE 1024
+
+/* Header length without source and destination ID's. */
+#define SILC_PACKET_HEADER_LEN 10
+
+/* Minimum length of SILC Packet Header. */
+#define SILC_PACKET_MIN_HEADER_LEN 16
+#define SILC_PACKET_MIN_HEADER_LEN_IV 32 + 1
+
+/* Maximum padding length */
+#define SILC_PACKET_MAX_PADLEN 128
+
+/* Default padding length */
+#define SILC_PACKET_DEFAULT_PADLEN 16
+
+/* Minimum packet length */
+#define SILC_PACKET_MIN_LEN (SILC_PACKET_HEADER_LEN + 1)
+
+/* Returns true length of the packet. */
+#define SILC_PACKET_LENGTH(__packetdata, __ret_truelen, __ret_paddedlen) \
+do { \
+ SILC_GET16_MSB((__ret_truelen), (__packetdata)); \
+ (__ret_paddedlen) = (__ret_truelen) + (SilcUInt8)(__packetdata)[4]; \
+} while(0)
+
+/* Calculates the data length with given header length. This macro
+ can be used to check whether the data_len with header_len exceeds
+ SILC_PACKET_MAX_LEN. If it does, this returns the new data_len
+ so that the SILC_PACKET_MAX_LEN is not exceeded. If the data_len
+ plus header_len fits SILC_PACKET_MAX_LEN the returned data length
+ is the data_len given as argument. */
+#define SILC_PACKET_DATALEN(data_len, header_len) \
+ ((data_len + header_len) > SILC_PACKET_MAX_LEN ? \
+ data_len - ((data_len + header_len) - SILC_PACKET_MAX_LEN) : data_len)
+
+/* Calculates the length of the padding in the packet. */
+#define SILC_PACKET_PADLEN(__packetlen, __blocklen, __padlen) \
+do { \
+ __padlen = (SILC_PACKET_DEFAULT_PADLEN - (__packetlen) % \
+ ((__blocklen) ? (__blocklen) : SILC_PACKET_DEFAULT_PADLEN)); \
+ if (__padlen < 8) \
+ __padlen += ((__blocklen) ? (__blocklen) : SILC_PACKET_DEFAULT_PADLEN); \
+} while(0)
+
+/* Returns the length of the padding up to the maximum length, which
+ is 128 bytes.*/
+#define SILC_PACKET_PADLEN_MAX(__packetlen, __blocklen, __padlen) \
+do { \
+ __padlen = (SILC_PACKET_MAX_PADLEN - (__packetlen) % \
+ ((__blocklen) ? (__blocklen) : SILC_PACKET_DEFAULT_PADLEN)); \
+} while(0)
+
+/* EOS callback */
+#define SILC_PACKET_CALLBACK_EOS(s) \
+do { \
+ (s)->sc->engine->callbacks->eos((s)->sc->engine, s, \
+ (s)->sc->engine->callback_context, \
+ (s)->stream_context); \
+} while(0)
+
+/* Error callback */
+#define SILC_PACKET_CALLBACK_ERROR(s, err) \
+do { \
+ (s)->sc->engine->callbacks->error((s)->sc->engine, s, err, \
+ (s)->sc->engine->callback_context, \
+ (s)->stream_context); \
+} while(0)
+
+static SilcBool silc_packet_dispatch(SilcPacket packet);
+static void silc_packet_read_process(SilcPacketStream stream);
+static inline SilcBool silc_packet_send_raw(SilcPacketStream stream,
+ SilcPacketType type,
+ SilcPacketFlags flags,
+ SilcIdType src_id_type,
+ unsigned char *src_id,
+ SilcUInt32 src_id_len,
+ SilcIdType dst_id_type,
+ unsigned char *dst_id,
+ SilcUInt32 dst_id_len,
+ const unsigned char *data,
+ SilcUInt32 data_len,
+ SilcCipher cipher,
+ SilcHmac hmac);
+
+/************************ Static utility functions **************************/
+
+/* Injects packet to new stream created with silc_packet_stream_add_remote. */
+
+SILC_TASK_CALLBACK(silc_packet_stream_inject_packet)
+{
+ SilcPacket packet = context;
+ SilcPacketStream stream = packet->stream;
+
+ SILC_LOG_DEBUG(("Injecting packet %p to stream %p", packet, packet->stream));
+
+ silc_mutex_lock(stream->lock);
+ if (!stream->destroyed)
+ silc_packet_dispatch(packet);
+ silc_mutex_unlock(stream->lock);
+ silc_packet_stream_unref(stream);
+}
+
+/* Write data to the stream. Must be called with ps->lock locked. Unlocks
+ the lock inside this function, unless no_unlock is TRUE. Unlocks always
+ in case it returns FALSE. */
+
+static inline SilcBool silc_packet_stream_write(SilcPacketStream ps,
+ SilcBool no_unlock)
+{
+ SilcStream stream;
+ SilcBool connected;
+ int i;
+
+ if (ps->udp)
+ stream = ((SilcPacketStream)ps->stream)->stream;
+ else
+ stream = ps->stream;
+
+ if (ps->udp && silc_socket_stream_is_udp(stream, &connected)) {
+ if (!connected) {
+ /* Connectionless UDP stream */
+ while (silc_buffer_len(&ps->outbuf) > 0) {
+ i = silc_net_udp_send(stream, ps->remote_udp->remote_ip,
+ ps->remote_udp->remote_port,
+ ps->outbuf.data, silc_buffer_len(&ps->outbuf));
+ if (silc_unlikely(i == -2)) {
+ /* Error */
+ silc_buffer_reset(&ps->outbuf);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_WRITE);
+ return FALSE;
+ }
+
+ if (silc_unlikely(i == -1)) {
+ /* Cannot write now, write later. */
+ if (!no_unlock)
+ silc_mutex_unlock(ps->lock);
+ return TRUE;
+ }
+
+ /* Wrote data */
+ silc_buffer_pull(&ps->outbuf, i);
+ }
+
+ silc_buffer_reset(&ps->outbuf);
+ if (!no_unlock)
+ silc_mutex_unlock(ps->lock);
+
+ return TRUE;
+ }
+ }
+
+ /* Write the data to the stream */
+ while (silc_buffer_len(&ps->outbuf) > 0) {
+ i = silc_stream_write(stream, ps->outbuf.data,
+ silc_buffer_len(&ps->outbuf));
+ if (silc_unlikely(i == 0)) {
+ /* EOS */
+ silc_buffer_reset(&ps->outbuf);
+ silc_mutex_unlock(ps->lock);
+ SILC_PACKET_CALLBACK_EOS(ps);
+ return FALSE;
+ }
+
+ if (silc_unlikely(i == -2)) {
+ /* Error */
+ silc_buffer_reset(&ps->outbuf);
+ silc_mutex_unlock(ps->lock);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_WRITE);
+ return FALSE;
+ }
+
+ if (silc_unlikely(i == -1)) {
+ /* Cannot write now, write later. */
+ if (!no_unlock)
+ silc_mutex_unlock(ps->lock);
+ return TRUE;
+ }
+
+ /* Wrote data */
+ silc_buffer_pull(&ps->outbuf, i);
+ }
+
+ silc_buffer_reset(&ps->outbuf);
+ if (!no_unlock)
+ silc_mutex_unlock(ps->lock);
+
+ return TRUE;
+}
+
+/* Reads data from stream. Must be called with ps->lock locked. If this
+ returns FALSE the lock has been unlocked. If this returns packet stream
+ to `ret_ps' its lock has been acquired and `ps' lock has been unlocked.
+ It is returned if the stream is UDP and remote UDP stream exists for
+ the sender of the packet. */
+
+static inline SilcBool silc_packet_stream_read(SilcPacketStream ps,
+ SilcPacketStream *ret_ps)
+{
+ SilcStream stream = ps->stream;
+ SilcBuffer inbuf;
+ SilcBool connected;
+ int ret;
+
+ /* Get inbuf. If there is already some data for this stream in the buffer
+ we already have it. Otherwise get the current one from list, it will
+ include the data. */
+ inbuf = ps->inbuf;
+ if (!inbuf) {
+ silc_dlist_start(ps->sc->inbufs);
+ inbuf = silc_dlist_get(ps->sc->inbufs);
+ if (!inbuf) {
+ /* Allocate new data input buffer */
+ inbuf = silc_buffer_alloc(SILC_PACKET_DEFAULT_SIZE * 65);
+ if (!inbuf) {
+ silc_mutex_unlock(ps->lock);
+ return FALSE;
+ }
+ silc_buffer_reset(inbuf);
+ silc_dlist_add(ps->sc->inbufs, inbuf);
+ }
+ }
+
+ /* Make sure there is enough room to read */
+ if (SILC_PACKET_DEFAULT_SIZE * 2 > silc_buffer_taillen(inbuf))
+ silc_buffer_realloc(inbuf, silc_buffer_truelen(inbuf) +
+ (SILC_PACKET_DEFAULT_SIZE * 2));
+
+ if (silc_socket_stream_is_udp(stream, &connected)) {
+ if (!connected) {
+ /* Connectionless UDP stream, read one UDP packet */
+ char remote_ip[64], tuple[64];
+ int remote_port;
+ SilcPacketStream remote;
+
+ ret = silc_net_udp_receive(stream, remote_ip, sizeof(remote_ip),
+ &remote_port, inbuf->tail,
+ silc_buffer_taillen(inbuf));
+
+ if (silc_unlikely(ret < 0)) {
+ silc_mutex_unlock(ps->lock);
+ if (ret == -1) {
+ /* Cannot read now, do it later. */
+ return FALSE;
+ }
+
+ /* Error */
+ silc_buffer_reset(inbuf);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
+ return FALSE;
+ }
+
+ /* See if remote packet stream exist for this sender */
+ silc_snprintf(tuple, sizeof(tuple), "%d%s", remote_port, remote_ip);
+ silc_mutex_lock(ps->sc->engine->lock);
+ if (silc_hash_table_find(ps->sc->engine->udp_remote, tuple, NULL,
+ (void *)&remote)) {
+ silc_mutex_unlock(ps->sc->engine->lock);
+ SILC_LOG_DEBUG(("UDP packet from %s:%d for stream %p", remote_ip,
+ remote_port, remote));
+ silc_mutex_unlock(ps->lock);
+ silc_mutex_lock(remote->lock);
+ *ret_ps = remote;
+ return TRUE;
+ }
+ silc_mutex_unlock(ps->sc->engine->lock);
+
+ /* Unknown sender */
+ if (!ps->remote_udp) {
+ ps->remote_udp = silc_calloc(1, sizeof(*ps->remote_udp));
+ if (silc_unlikely(!ps->remote_udp)) {
+ silc_mutex_unlock(ps->lock);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_NO_MEMORY);
+ return FALSE;
+ }
+ }
+
+ /* Save sender IP and port */
+ silc_free(ps->remote_udp->remote_ip);
+ ps->remote_udp->remote_ip = strdup(remote_ip);
+ ps->remote_udp->remote_port = remote_port;
+
+ silc_buffer_pull_tail(inbuf, ret);
+ return TRUE;
+ }
+ }
+
+ /* Read data from the stream */
+ ret = silc_stream_read(stream, inbuf->tail, silc_buffer_taillen(inbuf));
+ if (silc_unlikely(ret <= 0)) {
+ silc_mutex_unlock(ps->lock);
+ if (ret == 0) {
+ /* EOS */
+ silc_buffer_reset(inbuf);
+ SILC_PACKET_CALLBACK_EOS(ps);
+ return FALSE;
+ }
+
+ if (ret == -1) {
+ /* Cannot read now, do it later. */
+ return FALSE;
+ }
+
+ /* Error */
+ silc_buffer_reset(inbuf);
+ SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
+ return FALSE;
+ }
+
+ silc_buffer_pull_tail(inbuf, ret);
+ return TRUE;
+}
+
+/* Our stream IO notifier callback. */
+
+static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
+ void *context)
+{
+ SilcPacketStream remote = NULL, ps = context;
+
+ silc_mutex_lock(ps->lock);
+
+ if (silc_unlikely(ps->destroyed)) {
+ silc_mutex_unlock(ps->lock);
+ return;
+ }
+
+ switch (status) {
+ case SILC_STREAM_CAN_READ:
+ /* Reading is locked also with stream->lock because we may be reading
+ at the same time other thread is writing to same underlaying stream. */
+ SILC_LOG_DEBUG(("Reading data from stream %p, ps %p", ps->stream, ps));
+
+ /* Read data from stream */
+ if (!silc_packet_stream_read(ps, &remote))
+ return;
+
+ /* Now process the data */
+ silc_packet_stream_ref(ps);
+ if (!remote) {
+ silc_packet_read_process(ps);
+ silc_mutex_unlock(ps->lock);
+ } else {
+ silc_packet_read_process(remote);
+ silc_mutex_unlock(remote->lock);
+ }
+ silc_packet_stream_unref(ps);
+ break;
+
+ case SILC_STREAM_CAN_WRITE:
+ SILC_LOG_DEBUG(("Writing pending data to stream %p, ps %p",
+ ps->stream, ps));
+
+ if (silc_unlikely(!silc_buffer_headlen(&ps->outbuf))) {
+ silc_mutex_unlock(ps->lock);
+ return;
+ }
+
+ /* Write pending data to stream */
+ silc_packet_stream_write(ps, FALSE);
+ break;
+
+ default:
+ silc_mutex_unlock(ps->lock);
+ break;
+ }
+}
+
+/* Allocate packet */
+
+static SilcPacket silc_packet_alloc(SilcPacketEngine engine)
+{
+ SilcPacket packet;
+
+ SILC_LOG_DEBUG(("Packet pool count %d",
+ silc_list_count(engine->packet_pool)));
+
+ silc_mutex_lock(engine->lock);
+
+ /* Get packet from freelist or allocate new one. */
+ packet = silc_list_get(engine->packet_pool);
+ if (!packet) {
+ void *tmp;
+
+ silc_mutex_unlock(engine->lock);
+
+ packet = silc_calloc(1, sizeof(*packet));
+ if (silc_unlikely(!packet))
+ return NULL;
+
+ SILC_LOG_DEBUG(("Allocating new packet %p", packet));
+
+ tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
+ if (silc_unlikely(!tmp)) {
+ silc_free(packet);
+ return NULL;
+ }
+ silc_buffer_set(&packet->buffer, tmp, SILC_PACKET_DEFAULT_SIZE);
+ silc_buffer_reset(&packet->buffer);
+
+ return packet;
+ }
+
+ SILC_LOG_DEBUG(("Get packet %p", packet));
+
+ /* Delete from freelist */
+ silc_list_del(engine->packet_pool, packet);
+
+ silc_mutex_unlock(engine->lock);
+
+ return packet;
+}
+
+/* UDP remote stream hash table destructor */
+
+static void silc_packet_engine_hash_destr(void *key, void *context,
+ void *user_context)
+{
+ silc_free(key);
+}
+
+/* Per scheduler context hash table destructor */
+
+static void silc_packet_engine_context_destr(void *key, void *context,
+ void *user_context)
+{
+ SilcPacketEngineContext sc = context;
+ SilcBuffer buffer;
+
+ silc_dlist_start(sc->inbufs);
+ while ((buffer = silc_dlist_get(sc->inbufs))) {
+ silc_buffer_clear(buffer);
+ silc_buffer_free(buffer);
+ silc_dlist_del(sc->inbufs, buffer);
+ }
+
+ silc_dlist_uninit(sc->inbufs);
+ silc_free(sc);
+}
+
+
+/******************************** Packet API ********************************/
+
+/* Allocate new packet engine */
+
+SilcPacketEngine
+silc_packet_engine_start(SilcRng rng, SilcBool router,
+ const SilcPacketCallbacks *callbacks,
+ void *callback_context)
+{
+ SilcPacketEngine engine;
+ SilcPacket packet;
+ int i;
+ void *tmp;
+
+ SILC_LOG_DEBUG(("Starting new packet engine"));
+
+ if (!callbacks)
+ return NULL;
+ if (!callbacks->packet_receive || !callbacks->eos || !callbacks->error)
+ return NULL;
+
+ engine = silc_calloc(1, sizeof(*engine));
+ if (!engine)
+ return NULL;
+
+ engine->contexts = silc_hash_table_alloc(0, silc_hash_ptr, NULL, NULL, NULL,
+ silc_packet_engine_context_destr,
+ engine, TRUE);
+ if (!engine->contexts) {
+ silc_free(engine);
+ return NULL;
+ }
+
+ engine->rng = rng;
+ engine->local_is_router = router;
+ engine->callbacks = callbacks;
+ engine->callback_context = callback_context;
+ silc_list_init(engine->streams, struct SilcPacketStreamStruct, next);
+ silc_mutex_alloc(&engine->lock);
+
+ /* Allocate packet free list */
+ silc_list_init(engine->packet_pool, struct SilcPacketStruct, next);
+ for (i = 0; i < 5; i++) {
+ packet = silc_calloc(1, sizeof(*packet));
+ if (!packet) {
+ silc_packet_engine_stop(engine);
+ return NULL;
+ }
+
+ tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
+ if (!tmp) {
+ silc_packet_engine_stop(engine);
+ return NULL;
+ }
+ silc_buffer_set(&packet->buffer, tmp, SILC_PACKET_DEFAULT_SIZE);
+ silc_buffer_reset(&packet->buffer);
+
+ silc_list_add(engine->packet_pool, packet);
+ }
+ silc_list_start(engine->packet_pool);
+
+ return engine;
+}
+
+/* Stop packet engine */
+
+void silc_packet_engine_stop(SilcPacketEngine engine)
+{
+ SilcPacket packet;
+
+ SILC_LOG_DEBUG(("Stopping packet engine"));
+
+ if (!engine)
+ return;
+
+ /* Free packet free list */
+ silc_list_start(engine->packet_pool);
+ while ((packet = silc_list_get(engine->packet_pool))) {
+ silc_buffer_purge(&packet->buffer);
+ silc_free(packet);
+ }
+
+ silc_hash_table_free(engine->contexts);
+ silc_mutex_free(engine->lock);
+ silc_free(engine);
+}
+
+static const char * const packet_error[] = {
+ "Cannot read from stream",
+ "Cannot write to stream",
+ "Packet MAC failed",
+ "Packet decryption failed",
+ "Unknown SID",
+ "Packet is malformed",
+ "System out of memory",
+};
+
+/* Return packet error string */
+
+const char *silc_packet_error_string(SilcPacketError error)
+{
+ if (error < SILC_PACKET_ERR_READ || error > SILC_PACKET_ERR_NO_MEMORY)
+ return "<invalid error code>";
+ return packet_error[error];
+}
+
+/* Return list of packet streams in the engine */
+
+SilcDList silc_packet_engine_get_streams(SilcPacketEngine engine)
+{
+ SilcDList list;
+ SilcPacketStream ps;
+
+ list = silc_dlist_init();
+ if (!list)
+ return NULL;
+
+ silc_mutex_lock(engine->lock);
+ silc_list_start(engine->streams);
+ while ((ps = silc_list_get(engine->streams))) {
+ silc_packet_stream_ref(ps);
+ silc_dlist_add(list, ps);
+ }
+ silc_mutex_unlock(engine->lock);
+
+ return list;
+}
+
+/* Free list returned by silc_packet_engine_get_streams */
+
+void silc_packet_engine_free_streams_list(SilcDList streams)
+{
+ SilcPacketStream ps;
+
+ silc_dlist_start(streams);
+ while ((ps = silc_dlist_get(streams)))
+ silc_packet_stream_unref(ps);
+
+ silc_dlist_uninit(streams);
+}
+
+/* Create new packet stream */
+
+SilcPacketStream silc_packet_stream_create(SilcPacketEngine engine,
+ SilcSchedule schedule,
+ SilcStream stream)
+{
+ SilcPacketStream ps;
+ SilcBuffer inbuf;
+ void *tmp;
+
+ SILC_LOG_DEBUG(("Creating new packet stream"));
+
+ if (!engine || !stream)
+ return NULL;
+
+ ps = silc_calloc(1, sizeof(*ps));
+ if (!ps)
+ return NULL;
+
+ ps->stream = stream;
+ silc_atomic_init8(&ps->refcnt, 1);
+ silc_mutex_alloc(&ps->lock);
+
+ /* Allocate out buffer */
+ tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
+ if (!tmp) {
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ silc_buffer_set(&ps->outbuf, tmp, SILC_PACKET_DEFAULT_SIZE);
+ silc_buffer_reset(&ps->outbuf);
+
+ /* Initialize packet procesors list */
+ ps->process = silc_dlist_init();
+ if (!ps->process) {
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+
+ silc_mutex_lock(engine->lock);
+
+ /* Add per scheduler context */
+ if (!silc_hash_table_find(engine->contexts, schedule, NULL,
+ (void *)&ps->sc)) {
+ ps->sc = silc_calloc(1, sizeof(*ps->sc));
+ if (!ps->sc) {
+ silc_mutex_unlock(engine->lock);
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ ps->sc->engine = engine;
+ ps->sc->schedule = schedule;
+
+ /* Allocate data input buffer */
+ inbuf = silc_buffer_alloc(SILC_PACKET_DEFAULT_SIZE * 65);
+ if (!inbuf) {
+ silc_free(ps->sc);
+ ps->sc = NULL;
+ silc_mutex_unlock(engine->lock);
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ silc_buffer_reset(inbuf);
+
+ ps->sc->inbufs = silc_dlist_init();
+ if (!ps->sc->inbufs) {
+ silc_buffer_free(inbuf);
+ silc_free(ps->sc);
+ ps->sc = NULL;
+ silc_mutex_unlock(engine->lock);
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ silc_dlist_add(ps->sc->inbufs, inbuf);
+
+ /* Add to per scheduler context hash table */
+ if (!silc_hash_table_add(engine->contexts, schedule, ps->sc)) {
+ silc_buffer_free(inbuf);
+ silc_dlist_del(ps->sc->inbufs, inbuf);
+ silc_free(ps->sc);
+ ps->sc = NULL;
+ silc_mutex_unlock(engine->lock);
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ }
+ ps->sc->stream_count++;
+
+ /* Add the packet stream to engine */
+ silc_list_add(engine->streams, ps);
+
+ /* If this is UDP stream, allocate UDP remote stream hash table */
+ if (!engine->udp_remote && silc_socket_stream_is_udp(stream, NULL))
+ engine->udp_remote = silc_hash_table_alloc(0, silc_hash_string, NULL,
+ silc_hash_string_compare, NULL,
+ silc_packet_engine_hash_destr,
+ NULL, TRUE);
+
+ silc_mutex_unlock(engine->lock);
+
+ /* Set IO notifier callback. This schedules this stream for I/O. */
+ if (!silc_stream_set_notifier(ps->stream, schedule,
+ silc_packet_stream_io, ps)) {
+ SILC_LOG_DEBUG(("Cannot set stream notifier for packet stream"));
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+
+ SILC_LOG_DEBUG(("Created packet stream %p", ps));
+
+ return ps;
+}
+
+/* Add new remote packet stream for UDP packet streams */
+
+SilcPacketStream silc_packet_stream_add_remote(SilcPacketStream stream,
+ const char *remote_ip,
+ SilcUInt16 remote_port,
+ SilcPacket packet)
+{
+ SilcPacketEngine engine = stream->sc->engine;
+ SilcPacketStream ps;
+ char *tuple;
+ void *tmp;
+
+ SILC_LOG_DEBUG(("Adding UDP remote %s:%d to packet stream %p",
+ remote_ip, remote_port, stream));
+
+ if (!stream || !remote_ip || !remote_port)
+ return NULL;
+
+ if (!silc_socket_stream_is_udp(stream->stream, NULL)) {
+ SILC_LOG_ERROR(("Stream is not UDP stream, cannot add remote IP"));
+ return NULL;
+ }
+
+ ps = silc_calloc(1, sizeof(*ps));
+ if (!ps)
+ return NULL;
+ ps->sc = stream->sc;
+
+ silc_atomic_init8(&ps->refcnt, 1);
+ silc_mutex_alloc(&ps->lock);
+
+ /* Set the UDP packet stream as underlaying stream */
+ silc_packet_stream_ref(stream);
+ ps->stream = (SilcStream)stream;
+ ps->udp = TRUE;
+
+ /* Allocate out buffer */
+ tmp = silc_malloc(SILC_PACKET_DEFAULT_SIZE);
+ if (!tmp) {
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ silc_buffer_set(&ps->outbuf, tmp, SILC_PACKET_DEFAULT_SIZE);
+ silc_buffer_reset(&ps->outbuf);
+
+ /* Initialize packet procesors list */
+ ps->process = silc_dlist_init();
+ if (!ps->process) {
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+
+ /* Add to engine with this IP and port pair */
+ tuple = silc_format("%d%s", remote_port, remote_ip);
+ silc_mutex_lock(engine->lock);
+ if (!tuple || !silc_hash_table_add(engine->udp_remote, tuple, ps)) {
+ silc_mutex_unlock(engine->lock);
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ silc_mutex_unlock(engine->lock);
+
+ /* Save remote IP and port pair */
+ ps->remote_udp = silc_calloc(1, sizeof(*ps->remote_udp));
+ if (!ps->remote_udp) {
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+ ps->remote_udp->remote_port = remote_port;
+ ps->remote_udp->remote_ip = strdup(remote_ip);
+ if (!ps->remote_udp->remote_ip) {
+ silc_packet_stream_destroy(ps);
+ return NULL;
+ }
+
+ if (packet) {
+ /* Inject packet to the new stream */
+ packet->stream = ps;
+ silc_packet_stream_ref(ps);
+ silc_schedule_task_add_timeout(silc_stream_get_schedule(stream->stream),
+ silc_packet_stream_inject_packet, packet,
+ 0, 0);
+ }
+
+ return ps;
+}
+
+/* Destroy packet stream */
+
+void silc_packet_stream_destroy(SilcPacketStream stream)
+{
+ SilcPacketEngine engine;
+
+ if (!stream)
+ return;
+
+ if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0) {
+ if (stream->destroyed)
+ return;
+ stream->destroyed = TRUE;
+
+ SILC_LOG_DEBUG(("Marking packet stream %p destroyed", stream));
+
+ /* Close the underlaying stream */
+ if (!stream->udp && stream->stream)
+ silc_stream_close(stream->stream);
+ return;
+ }
+
+ SILC_LOG_DEBUG(("Destroying packet stream %p", stream));
+
+ if (!stream->udp) {
+ /* Delete from engine */
+ if (stream->sc) {
+ engine = stream->sc->engine;
+ silc_mutex_lock(engine->lock);
+ silc_list_del(engine->streams, stream);
+
+ /* Remove per scheduler context, if it is not used anymore */
+ stream->sc->stream_count--;
+ if (!stream->sc->stream_count)
+ silc_hash_table_del(engine->contexts, stream->sc->schedule);
+
+ silc_mutex_unlock(engine->lock);
+ }
+
+ /* Destroy the underlaying stream */
+ if (stream->stream)
+ silc_stream_destroy(stream->stream);
+ } else {
+ /* Delete from UDP remote hash table */
+ char tuple[64];
+ engine = stream->sc->engine;
+ silc_snprintf(tuple, sizeof(tuple), "%d%s",
+ stream->remote_udp->remote_port,
+ stream->remote_udp->remote_ip);
+ silc_mutex_lock(engine->lock);
+ silc_hash_table_del(engine->udp_remote, tuple);
+ silc_mutex_unlock(engine->lock);
+
+ silc_free(stream->remote_udp->remote_ip);
+ silc_free(stream->remote_udp);
+
+ /* Unreference the underlaying packet stream */
+ silc_packet_stream_unref((SilcPacketStream)stream->stream);
+ }
+
+ /* Clear and free buffers */
+ silc_buffer_clear(&stream->outbuf);
+ silc_buffer_purge(&stream->outbuf);
+
+ if (stream->process) {
+ SilcPacketProcess p;
+ silc_dlist_start(stream->process);
+ while ((p = silc_dlist_get(stream->process))) {
+ silc_free(p->types);
+ silc_free(p);
+ silc_dlist_del(stream->process, p);
+ }
+ silc_dlist_uninit(stream->process);
+ }
+
+ /* Destroy ciphers and HMACs */
+ if (stream->send_key[0])
+ silc_cipher_free(stream->send_key[0]);
+ if (stream->receive_key[0])
+ silc_cipher_free(stream->receive_key[0]);
+ if (stream->send_hmac[0])
+ silc_hmac_free(stream->send_hmac[0]);
+ if (stream->receive_hmac[0])
+ silc_hmac_free(stream->receive_hmac[0]);
+ if (stream->send_key[1])
+ silc_cipher_free(stream->send_key[1]);
+ if (stream->receive_key[1])
+ silc_cipher_free(stream->receive_key[1]);
+ if (stream->send_hmac[1])
+ silc_hmac_free(stream->send_hmac[1]);
+ if (stream->receive_hmac[1])
+ silc_hmac_free(stream->receive_hmac[1]);
+
+ /* Free IDs */
+ silc_free(stream->src_id);
+ silc_free(stream->dst_id);
+
+ silc_atomic_uninit8(&stream->refcnt);
+ silc_mutex_free(stream->lock);
+ silc_free(stream);
+}
+
+/* Return TRUE if the stream is valid */
+
+SilcBool silc_packet_stream_is_valid(SilcPacketStream stream)
+{
+ return stream->destroyed == FALSE;
+}
+
+/* Marks as router stream */
+
+void silc_packet_stream_set_router(SilcPacketStream stream)
+{
+ stream->is_router = TRUE;
+}
+
+/* Mark to include IV in ciphertext */
+
+void silc_packet_stream_set_iv_included(SilcPacketStream stream)
+{
+ stream->iv_included = TRUE;
+}
+
+/* Links `callbacks' to `stream' for specified packet types */
+
+static SilcBool silc_packet_stream_link_va(SilcPacketStream stream,
+ const SilcPacketCallbacks *callbacks,
+ void *callback_context,
+ int priority, va_list ap)
+{
+ SilcPacketProcess p, e;
+ SilcInt32 packet_type;
+ int i;
+
+ SILC_LOG_DEBUG(("Linking callbacks %p to stream %p", callbacks, stream));
+
+ if (!callbacks)
+ return FALSE;
+ if (!callbacks->packet_receive)
+ return FALSE;
+
+ p = silc_calloc(1, sizeof(*p));
+ if (!p)
+ return FALSE;
+
+ p->priority = priority;
+ p->callbacks = callbacks;
+ p->callback_context = callback_context;
+
+ silc_mutex_lock(stream->lock);
+
+ if (!stream->process) {
+ stream->process = silc_dlist_init();
+ if (!stream->process) {
+ silc_mutex_unlock(stream->lock);
+ return FALSE;
+ }
+ }
+
+ /* According to priority set the procesor to correct position. First
+ entry has the highest priority */
+ silc_dlist_start(stream->process);
+ while ((e = silc_dlist_get(stream->process)) != SILC_LIST_END) {
+ if (p->priority > e->priority) {
+ silc_dlist_insert(stream->process, p);
+ break;
+ }
+ }
+ if (!e)
+ silc_dlist_add(stream->process, p);
+
+ /* Get packet types to process */
+ i = 1;
+ while (1) {
+ packet_type = va_arg(ap, SilcInt32);
+
+ if (packet_type == SILC_PACKET_ANY)
+ break;
+
+ if (packet_type == -1)
+ break;
+
+ p->types = silc_realloc(p->types, sizeof(*p->types) * (i + 1));
+ if (!p->types) {
+ silc_mutex_unlock(stream->lock);
+ return FALSE;
+ }
+
+ p->types[i - 1] = (SilcPacketType)packet_type;
+ i++;
+ }
+ if (p->types)
+ p->types[i - 1] = 0;
+
+ silc_mutex_unlock(stream->lock);
+
+ silc_packet_stream_ref(stream);
+
+ return TRUE;
+}
+
+/* Links `callbacks' to `stream' for specified packet types */
+
+SilcBool silc_packet_stream_link(SilcPacketStream stream,
+ const SilcPacketCallbacks *callbacks,
+ void *callback_context,
+ int priority, ...)
+{
+ va_list ap;
+ SilcBool ret;
+
+ va_start(ap, priority);
+ ret = silc_packet_stream_link_va(stream, callbacks, callback_context,
+ priority, ap);
+ va_end(ap);
+
+ return ret;
+}
+
+/* Unlinks `callbacks' from `stream'. */
+
+void silc_packet_stream_unlink(SilcPacketStream stream,
+ const SilcPacketCallbacks *callbacks,
+ void *callback_context)
+{
+ SilcPacketProcess p;
+
+ SILC_LOG_DEBUG(("Unlinking callbacks %p from stream %p",
+ callbacks, stream));
+
+ silc_mutex_lock(stream->lock);
+
+ silc_dlist_start(stream->process);
+ while ((p = silc_dlist_get(stream->process)) != SILC_LIST_END)
+ if (p->callbacks == callbacks &&
+ p->callback_context == callback_context) {
+ silc_dlist_del(stream->process, p);
+ silc_free(p->types);
+ silc_free(p);
+ break;
+ }
+
+ if (!silc_dlist_count(stream->process)) {
+ silc_dlist_uninit(stream->process);
+ stream->process = NULL;
+ }
+
+ silc_mutex_unlock(stream->lock);
+
+ silc_packet_stream_unref(stream);
+}
+
+/* Returns TRUE if stream is UDP stream */
+
+SilcBool silc_packet_stream_is_udp(SilcPacketStream stream)
+{
+ return stream->udp || silc_socket_stream_is_udp(stream->stream, NULL);
+}
+
+/* Return packet sender IP and port for UDP packet stream */
+
+SilcBool silc_packet_get_sender(SilcPacket packet,
+ const char **sender_ip,
+ SilcUInt16 *sender_port)
+{
+ if (!packet->stream->remote_udp)
+ return FALSE;
+
+ *sender_ip = packet->stream->remote_udp->remote_ip;
+ *sender_port = packet->stream->remote_udp->remote_port;
+
+ return TRUE;
+}
+
+/* Reference packet stream */
+
+void silc_packet_stream_ref(SilcPacketStream stream)
+{
+ silc_atomic_add_int8(&stream->refcnt, 1);
+ SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream,
+ silc_atomic_get_int8(&stream->refcnt) - 1,
+ silc_atomic_get_int8(&stream->refcnt)));
+}
+
+/* Unreference packet stream */
+
+void silc_packet_stream_unref(SilcPacketStream stream)
+{
+ SILC_LOG_DEBUG(("Stream %p, refcnt %d->%d", stream,
+ silc_atomic_get_int8(&stream->refcnt),
+ silc_atomic_get_int8(&stream->refcnt) - 1));
+ if (silc_atomic_sub_int8(&stream->refcnt, 1) > 0)
+ return;
+ silc_atomic_add_int8(&stream->refcnt, 1);
+ silc_packet_stream_destroy(stream);
+}
+
+/* Return engine */
+
+SilcPacketEngine silc_packet_get_engine(SilcPacketStream stream)
+{
+ return stream->sc->engine;
+}
+
+/* Set application context for packet stream */
+
+void silc_packet_set_context(SilcPacketStream stream, void *stream_context)
+{
+ silc_mutex_lock(stream->lock);
+ stream->stream_context = stream_context;
+ silc_mutex_unlock(stream->lock);
+}
+
+/* Return application context from packet stream */
+
+void *silc_packet_get_context(SilcPacketStream stream)
+{
+ void *context;
+ silc_mutex_lock(stream->lock);
+ context = stream->stream_context;
+ silc_mutex_unlock(stream->lock);
+ return context;
+}
+
+/* Change underlaying stream */
+
+void silc_packet_stream_set_stream(SilcPacketStream ps,
+ SilcStream stream)
+{
+ if (ps->stream)
+ silc_stream_set_notifier(ps->stream, ps->sc->schedule, NULL, NULL);
+ ps->stream = stream;
+ silc_stream_set_notifier(ps->stream, ps->sc->schedule, silc_packet_stream_io,
+ ps);
+}
+
+/* Return underlaying stream */
+
+SilcStream silc_packet_stream_get_stream(SilcPacketStream stream)
+{
+ return stream->stream;
+}
+
+/* Set keys. */
+
+SilcBool silc_packet_set_keys(SilcPacketStream stream, SilcCipher send_key,
+ SilcCipher receive_key, SilcHmac send_hmac,
+ SilcHmac receive_hmac, SilcBool rekey)
+{
+ SILC_LOG_DEBUG(("Setting new keys to packet stream %p", stream));
+
+ /* If doing rekey, send REKEY_DONE packet */
+ if (rekey) {
+ /* This will take stream lock. */
+ if (!silc_packet_send_raw(stream, SILC_PACKET_REKEY_DONE, 0,
+ stream->src_id_type, stream->src_id,
+ stream->src_id_len, stream->dst_id_type,
+ stream->dst_id, stream->dst_id_len,
+ NULL, 0, stream->send_key[0],
+ stream->send_hmac[0]))
+ return FALSE;
+
+ /* Write the packet to the stream */
+ if (!silc_packet_stream_write(stream, TRUE))
+ return FALSE;
+ } else {
+ silc_mutex_lock(stream->lock);
+ }
+
+ /* In case IV Included is set, save the old keys */
+ if (stream->iv_included) {
+ if (stream->send_key[1] && send_key) {
+ silc_cipher_free(stream->send_key[1]);
+ stream->send_key[1] = stream->send_key[0];
+ }
+ if (stream->receive_key[1] && receive_key) {
+ silc_cipher_free(stream->receive_key[1]);
+ stream->receive_key[1] = stream->receive_key[0];
+ }
+ if (stream->send_hmac[1] && send_hmac) {
+ silc_hmac_free(stream->send_hmac[1]);
+ stream->send_hmac[1] = stream->send_hmac[0];
+ }
+ if (stream->receive_hmac[1] && receive_hmac) {
+ silc_hmac_free(stream->receive_hmac[1]);
+ stream->receive_hmac[1] = stream->receive_hmac[0];
+ }
+ } else {
+ if (stream->send_key[0] && send_key)
+ silc_cipher_free(stream->send_key[0]);
+ if (stream->receive_key[0] && receive_key)
+ silc_cipher_free(stream->receive_key[0]);
+ if (stream->send_hmac[0] && send_hmac)
+ silc_hmac_free(stream->send_hmac[0]);
+ if (stream->receive_hmac[0] && receive_hmac)
+ silc_hmac_free(stream->receive_hmac[0]);
+ }
+
+ /* Set keys */
+ if (send_key)
+ stream->send_key[0] = send_key;
+ if (receive_key)
+ stream->receive_key[0] = receive_key;
+ if (send_hmac)
+ stream->send_hmac[0] = send_hmac;
+ if (receive_hmac)
+ stream->receive_hmac[0] = receive_hmac;
+
+ silc_mutex_unlock(stream->lock);
+ return TRUE;
+}
+
+/* Return current ciphers from packet stream */
+
+SilcBool silc_packet_get_keys(SilcPacketStream stream,
+ SilcCipher *send_key,
+ SilcCipher *receive_key,
+ SilcHmac *send_hmac,
+ SilcHmac *receive_hmac)
+{
+ if (!stream->send_key[0] && !stream->receive_key[0] &&
+ !stream->send_hmac[0] && !stream->receive_hmac[0])
+ return FALSE;
+
+ silc_mutex_lock(stream->lock);
+
+ if (send_key)
+ *send_key = stream->send_key[0];
+ if (receive_key)
+ *receive_key = stream->receive_key[0];
+ if (send_hmac)
+ *send_hmac = stream->send_hmac[0];
+ if (receive_hmac)
+ *receive_hmac = stream->receive_hmac[0];
+
+ silc_mutex_unlock(stream->lock);
+
+ return TRUE;
+}
+
+/* Set SILC IDs to packet stream */
+
+SilcBool silc_packet_set_ids(SilcPacketStream stream,
+ SilcIdType src_id_type, const void *src_id,
+ SilcIdType dst_id_type, const void *dst_id)
+{
+ SilcUInt32 len;
+ unsigned char tmp[32];
+
+ if (!src_id && !dst_id)
+ return FALSE;
+
+ silc_mutex_lock(stream->lock);
+
+ if (src_id) {
+ SILC_LOG_DEBUG(("Setting source ID to packet stream %p", stream));
+
+ silc_free(stream->src_id);
+ if (!silc_id_id2str(src_id, src_id_type, tmp, sizeof(tmp), &len)) {
+ silc_mutex_unlock(stream->lock);
+ return FALSE;
+ }
+ stream->src_id = silc_memdup(tmp, len);
+ if (!stream->src_id) {
+ silc_mutex_unlock(stream->lock);
+ return FALSE;
+ }
+ stream->src_id_type = src_id_type;
+ stream->src_id_len = len;
+ }
+
+ if (dst_id) {
+ SILC_LOG_DEBUG(("Setting destination ID to packet stream %p", stream));
+
+ silc_free(stream->dst_id);
+ if (!silc_id_id2str(dst_id, dst_id_type, tmp, sizeof(tmp), &len)) {
+ silc_mutex_unlock(stream->lock);
+ return FALSE;
+ }
+ stream->dst_id = silc_memdup(tmp, len);
+ if (!stream->dst_id) {
+ silc_mutex_unlock(stream->lock);
+ return FALSE;
+ }
+ stream->dst_id_type = dst_id_type;
+ stream->dst_id_len = len;
+ }
+
+ silc_mutex_unlock(stream->lock);
+
+ return TRUE;
+}
+
+/* Return IDs from the packet stream */
+
+SilcBool silc_packet_get_ids(SilcPacketStream stream,
+ SilcBool *src_id_set, SilcID *src_id,
+ SilcBool *dst_id_set, SilcID *dst_id)
+{
+ if (src_id && stream->src_id)
+ if (!silc_id_str2id2(stream->src_id, stream->src_id_len,
+ stream->src_id_type, src_id))
+ return FALSE;
+
+ if (stream->src_id && src_id_set)
+ *src_id_set = TRUE;
+
+ if (dst_id && stream->dst_id)
+ if (!silc_id_str2id2(stream->dst_id, stream->dst_id_len,
+ stream->dst_id_type, dst_id))
+ return FALSE;
+
+ if (stream->dst_id && dst_id_set)
+ *dst_id_set = TRUE;
+
+ return TRUE;
+}
+
+/* Adds Security ID (SID) */
+
+SilcBool silc_packet_set_sid(SilcPacketStream stream, SilcUInt8 sid)
+{
+ if (!stream->iv_included)
+ return FALSE;
+
+ SILC_LOG_DEBUG(("Set packet stream %p SID to %d", stream, sid));
+
+ stream->sid = sid;
+ return TRUE;
+}
+
+/* Free packet */
+
+void silc_packet_free(SilcPacket packet)
+{
+ SilcPacketStream stream = packet->stream;
+
+ SILC_LOG_DEBUG(("Freeing packet %p", packet));
+
+ /* Check for double free */
+ SILC_ASSERT(packet->stream != NULL);
+
+ packet->stream = NULL;
+ packet->src_id = packet->dst_id = NULL;
+ silc_buffer_reset(&packet->buffer);
+
+ silc_mutex_lock(stream->sc->engine->lock);
+
+ /* Put the packet back to freelist */
+ silc_list_add(stream->sc->engine->packet_pool, packet);
+ if (silc_list_count(stream->sc->engine->packet_pool) == 1)
+ silc_list_start(stream->sc->engine->packet_pool);
+
+ silc_mutex_unlock(stream->sc->engine->lock);
+}
+
+/****************************** Packet Sending ******************************/
+
+/* Prepare outgoing data buffer for packet sending. Returns the
+ pointer to that buffer into the `packet'. */