Proper locking in packet receiving.
authorPekka Riikonen <priikone@silcnet.org>
Wed, 5 Jul 2006 09:38:32 +0000 (09:38 +0000)
committerPekka Riikonen <priikone@silcnet.org>
Wed, 5 Jul 2006 09:38:32 +0000 (09:38 +0000)
lib/silccore/silcpacket.c

index 5f3c55d092ba0c94c75f41ba55c1d2488bc38fcd..c19b9d08aadffb6490ae55566c25a118370c91a6 100644 (file)
@@ -211,17 +211,15 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
     break;
 
   case SILC_STREAM_CAN_READ:
-    /* Packet receiving can only happen in one thread for one SilcPacketStream,
-       so locking is not required in packet receiving procedure. */
-    silc_mutex_unlock(ps->lock);
-
     SILC_LOG_DEBUG(("Reading data from stream"));
 
     /* Make sure we have fair amount of free space in inbuf */
     if (silc_buffer_taillen(&ps->inbuf) < SILC_PACKET_DEFAULT_SIZE)
       if (!silc_buffer_realloc(&ps->inbuf, silc_buffer_truelen(&ps->inbuf) +
-                              SILC_PACKET_DEFAULT_SIZE * 2))
+                              SILC_PACKET_DEFAULT_SIZE * 2)) {
+       silc_mutex_unlock(ps->lock);
        return;
+      }
 
     /* Read data from stream */
     ret = silc_stream_read(ps->stream, ps->inbuf.tail,
@@ -230,6 +228,7 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
     if (ret == 0) {
       /* EOS */
       silc_buffer_reset(&ps->inbuf);
+      silc_mutex_unlock(ps->lock);
       SILC_PACKET_CALLBACK_EOS(ps);
       return;
     }
@@ -237,6 +236,7 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
     if (ret == -2) {
       /* Error */
       silc_buffer_reset(&ps->inbuf);
+      silc_mutex_unlock(ps->lock);
       SILC_PACKET_CALLBACK_ERROR(ps, SILC_PACKET_ERR_READ);
       return;
     }
@@ -244,15 +244,15 @@ static void silc_packet_stream_io(SilcStream stream, SilcStreamStatus status,
     if (ret == -1) {
       /* Cannot read now, do it later. */
       silc_buffer_pull(&ps->inbuf, silc_buffer_len(&ps->inbuf));
+      silc_mutex_unlock(ps->lock);
       return;
     }
 
-    /* Read some data */
-    silc_buffer_pull_tail(&ps->inbuf, ret);
-
     /* Now process the data */
+    silc_buffer_pull_tail(&ps->inbuf, ret);
     silc_packet_read_process(ps);
 
+    silc_mutex_unlock(ps->lock);
     break;
 
   default:
@@ -446,13 +446,16 @@ void silc_packet_stream_destroy(SilcPacketStream stream)
   /* Clear and free buffers */
   silc_buffer_clear(&stream->inbuf);
   silc_buffer_clear(&stream->outbuf);
-  silc_free(silc_buffer_steal(&stream->inbuf, NULL));
-  silc_free(silc_buffer_steal(&stream->outbuf, NULL));
-
-  silc_dlist_uninit(stream->process);
+  silc_buffer_purge(&stream->inbuf);
+  silc_buffer_purge(&stream->outbuf);
 
   /* XXX */
 
+  /* Destroy the underlaying stream */
+  silc_stream_destroy(stream->stream);
+
+  silc_dlist_uninit(stream->process);
+  silc_mutex_free(stream->lock);
   silc_free(stream);
 }
 
@@ -621,14 +624,20 @@ SilcPacketEngine silc_packet_get_engine(SilcPacketStream stream)
 
 void silc_packet_set_context(SilcPacketStream stream, void *stream_context)
 {
+  silc_mutex_lock(stream->lock);
   stream->stream_context = stream_context;
+  silc_mutex_unlock(stream->lock);
 }
 
 /* Return application context from packet stream */
 
 void *silc_packet_get_context(SilcPacketStream stream)
 {
-  return stream->stream_context;
+  void *context;
+  silc_mutex_lock(stream->lock);
+  context = stream->stream_context;
+  silc_mutex_unlock(stream->lock);
+  return context;
 }
 
 /* Return underlaying stream */
@@ -644,8 +653,10 @@ void silc_packet_set_ciphers(SilcPacketStream stream, SilcCipher send,
                             SilcCipher receive)
 {
   SILC_LOG_DEBUG(("Setting new ciphers to packet stream"));
+  silc_mutex_lock(stream->lock);
   stream->send_key = send;
   stream->receive_key = receive;
+  silc_mutex_unlock(stream->lock);
 }
 
 /* Return current ciphers from packet stream */
@@ -656,11 +667,15 @@ SilcBool silc_packet_get_ciphers(SilcPacketStream stream, SilcCipher *send,
   if (!stream->send_key && !stream->receive_key)
     return FALSE;
 
+  silc_mutex_lock(stream->lock);
+
   if (send)
     *send = stream->send_key;
   if (receive)
     *receive = stream->receive_key;
 
+  silc_mutex_unlock(stream->lock);
+
   return TRUE;
 }
 
@@ -670,8 +685,10 @@ void silc_packet_set_hmacs(SilcPacketStream stream, SilcHmac send,
                           SilcHmac receive)
 {
   SILC_LOG_DEBUG(("Setting new HMACs to packet stream"));
+  silc_mutex_lock(stream->lock);
   stream->send_hmac = send;
   stream->receive_hmac = receive;
+  silc_mutex_unlock(stream->lock);
 }
 
 /* Return current HMACs from packet stream */
@@ -682,11 +699,15 @@ SilcBool silc_packet_get_hmacs(SilcPacketStream stream, SilcHmac *send,
   if (!stream->send_hmac && !stream->receive_hmac)
     return FALSE;
 
+  silc_mutex_lock(stream->lock);
+
   if (send)
     *send = stream->send_hmac;
   if (receive)
     *receive = stream->receive_hmac;
 
+  silc_mutex_unlock(stream->lock);
+
   return TRUE;
 }
 
@@ -704,28 +725,40 @@ SilcBool silc_packet_set_ids(SilcPacketStream stream,
 
   SILC_LOG_DEBUG(("Setting new IDs to packet stream"));
 
+  silc_mutex_lock(stream->lock);
+
   if (src_id) {
     silc_free(stream->src_id);
-    if (!silc_id_id2str(src_id, src_id_type, tmp, sizeof(tmp), &len))
+    if (!silc_id_id2str(src_id, src_id_type, tmp, sizeof(tmp), &len)) {
+      silc_mutex_unlock(stream->lock);
       return FALSE;
+    }
     stream->src_id = silc_memdup(tmp, len);
-    if (!stream->src_id)
+    if (!stream->src_id) {
+      silc_mutex_unlock(stream->lock);
       return FALSE;
+    }
     stream->src_id_type = src_id_type;
     stream->src_id_len = len;
   }
 
   if (dst_id) {
     silc_free(stream->dst_id);
-    if (!silc_id_id2str(dst_id, dst_id_type, tmp, sizeof(tmp), &len))
+    if (!silc_id_id2str(dst_id, dst_id_type, tmp, sizeof(tmp), &len)) {
+      silc_mutex_unlock(stream->lock);
       return FALSE;
+    }
     stream->dst_id = silc_memdup(tmp, len);
-    if (!stream->dst_id)
+    if (!stream->dst_id) {
+      silc_mutex_unlock(stream->lock);
       return FALSE;
+    }
     stream->dst_id_type = dst_id_type;
     stream->dst_id_len = len;
   }
 
+  silc_mutex_unlock(stream->lock);
+
   return TRUE;
 }
 
@@ -1171,7 +1204,7 @@ static SilcBool silc_packet_parse(SilcPacket packet)
   return TRUE;
 }
 
-/* Dispatch packet to application */
+/* Dispatch packet to application.  Called with stream->lock locked. */
 
 static void silc_packet_dispatch(SilcPacket packet)
 {
@@ -1182,7 +1215,9 @@ static void silc_packet_dispatch(SilcPacket packet)
 
   /* Parse the packet */
   if (!silc_packet_parse(packet)) {
+    silc_mutex_unlock(packet->stream->lock);
     SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MALFORMED);
+    silc_mutex_lock(packet->stream->lock);
     silc_packet_free(packet);
     return;
   }
@@ -1192,11 +1227,13 @@ static void silc_packet_dispatch(SilcPacket packet)
   if (!stream->process) {
     /* Send to default processor as no others exist */
     SILC_LOG_DEBUG(("Dispatching packet to default callbacks"));
+    silc_mutex_unlock(packet->stream->lock);
     if (!stream->engine->callbacks->
        packet_receive(stream->engine, stream, packet,
                       stream->engine->callback_context,
                       stream->stream_context))
       silc_packet_free(packet);
+    silc_mutex_lock(packet->stream->lock);
     return;
   }
 
@@ -1208,52 +1245,68 @@ static void silc_packet_dispatch(SilcPacket packet)
     if (!default_sent && p->priority <= 0) {
       SILC_LOG_DEBUG(("Dispatching packet to default callbacks"));
       default_sent = TRUE;
+      silc_mutex_unlock(packet->stream->lock);
       if (stream->engine->callbacks->
          packet_receive(stream->engine, stream, packet,
                         stream->engine->callback_context,
                         stream->stream_context)) {
+       silc_mutex_lock(packet->stream->lock);
        return;
       }
+      silc_mutex_lock(packet->stream->lock);
     }
 
     /* Send to processor */
     if (!p->types) {
       /* Send all packet types */
       SILC_LOG_DEBUG(("Dispatching packet to %p callbacks", p->callbacks));
+      silc_mutex_unlock(packet->stream->lock);
       if (p->callbacks->packet_receive(stream->engine, stream, packet,
                                       p->callback_context,
-                                      stream->stream_context))
+                                      stream->stream_context)) {
+       silc_mutex_lock(packet->stream->lock);
        return;
+      }
+      silc_mutex_lock(packet->stream->lock);
     } else {
       /* Send specific types */
-      for (pt = p->types; *pt; pt++)
-       if (*pt == packet->type) {
-         SILC_LOG_DEBUG(("Dispatching packet to %p callbacks",
-                         p->callbacks));
-         if (p->callbacks->packet_receive(stream->engine, stream, packet,
-                                          p->callback_context,
-                                          stream->stream_context))
-           return;
-         break;
+      for (pt = p->types; *pt; pt++) {
+       if (*pt != packet->type)
+         continue;
+       SILC_LOG_DEBUG(("Dispatching packet to %p callbacks", p->callbacks));
+       silc_mutex_unlock(packet->stream->lock);
+       if (p->callbacks->packet_receive(stream->engine, stream, packet,
+                                        p->callback_context,
+                                        stream->stream_context)) {
+         silc_mutex_lock(packet->stream->lock);
+         return;
        }
+       silc_mutex_lock(packet->stream->lock);
+       break;
+      }
     }
   }
 
   if (!default_sent) {
     /* Send to default processor as it has not been sent yet */
     SILC_LOG_DEBUG(("Dispatching packet to default callbacks"));
+    silc_mutex_unlock(packet->stream->lock);
     if (stream->engine->callbacks->
        packet_receive(stream->engine, stream, packet,
                       stream->engine->callback_context,
-                      stream->stream_context))
+                      stream->stream_context)) {
+      silc_mutex_lock(packet->stream->lock);
       return;
+    }
+    silc_mutex_lock(packet->stream->lock);
   }
 
   /* If we got here, no one wanted the packet, so drop it */
   silc_packet_free(packet);
 }
 
-/* Process incoming data and parse packets. */
+/* Process incoming data and parse packets.  Called with stream->lock
+   locked. */
 
 static void silc_packet_read_process(SilcPacketStream stream)
 {
@@ -1296,7 +1349,9 @@ static void silc_packet_read_process(SilcPacketStream stream)
     /* Sanity checks */
     if (packetlen < SILC_PACKET_MIN_LEN) {
       SILC_LOG_ERROR(("Received too short packet"));
+      silc_mutex_unlock(stream->lock);
       SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MALFORMED);
+      silc_mutex_lock(stream->lock);
       memset(tmp, 0, sizeof(tmp));
       silc_buffer_reset(&stream->inbuf);
       return;
@@ -1314,7 +1369,9 @@ static void silc_packet_read_process(SilcPacketStream stream)
     if (!silc_packet_check_mac(stream->receive_hmac, stream->inbuf.data,
                               paddedlen, stream->inbuf.data + paddedlen,
                               stream->receive_psn)) {
+      silc_mutex_unlock(stream->lock);
       SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_MAC_FAILED);
+      silc_mutex_lock(stream->lock);
       memset(tmp, 0, sizeof(tmp));
       silc_buffer_reset(&stream->inbuf);
       return;
@@ -1323,7 +1380,9 @@ static void silc_packet_read_process(SilcPacketStream stream)
     /* Get packet */
     packet = silc_packet_alloc(stream->engine);
     if (!packet) {
+      silc_mutex_unlock(stream->lock);
       SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_NO_MEMORY);
+      silc_mutex_lock(stream->lock);
       memset(tmp, 0, sizeof(tmp));
       silc_buffer_reset(&stream->inbuf);
       return;
@@ -1335,7 +1394,9 @@ static void silc_packet_read_process(SilcPacketStream stream)
                               silc_buffer_truelen(&packet->buffer) +
                               (paddedlen -
                                silc_buffer_truelen(&packet->buffer)))) {
+       silc_mutex_unlock(stream->lock);
        SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_NO_MEMORY);
+       silc_mutex_lock(stream->lock);
        silc_packet_free(packet);
        memset(tmp, 0, sizeof(tmp));
        silc_buffer_reset(&stream->inbuf);
@@ -1378,7 +1439,9 @@ static void silc_packet_read_process(SilcPacketStream stream)
       ret = silc_packet_decrypt(stream->receive_key, stream->receive_hmac,
                                stream->receive_psn, &packet->buffer, normal);
       if (ret < 0) {
+       silc_mutex_unlock(stream->lock);
        SILC_PACKET_CALLBACK_ERROR(stream, SILC_PACKET_ERR_DECRYPTION_FAILED);
+       silc_mutex_lock(stream->lock);
        silc_packet_free(packet);
        memset(tmp, 0, sizeof(tmp));
        return;