+ SILC_ASSERT(pipe_index < queue->num_pipes);
+
+ SILC_LOG_DEBUG(("Push data %p to thread queue %p, pipe %d, demux %s",
+ data, queue, pipe_index, demux ? "yes" : "no"));
+
+ silc_mutex_lock(queue->pipes[pipe_index].lock);
+
+ d = silc_list_pop(queue->pipes[pipe_index].freelist);
+ if (!d) {
+ d = silc_calloc(1, sizeof(*d));
+ if (!d)
+ return;
+ }
+ d->data = data;
+
+ if (demux) {
+ for (i = 0; i < queue->num_pipes; i++) {
+ if (queue->fifo)
+ silc_list_add(queue->pipes[i].queue, d);
+ else
+ silc_list_insert(queue->pipes[i].queue, NULL, d);
+ }
+ } else {
+ if (queue->fifo)
+ silc_list_add(queue->pipes[pipe_index].queue, d);
+ else
+ silc_list_insert(queue->pipes[pipe_index].queue, NULL, d);
+ }