Scheduler: made scheduled signals thread specific
[runtime.git] / lib / silcutil / unix / silcunixschedule.c
1 /*
2
3   silcunixschedule.c
4
5   Author: Pekka Riikonen <priikone@silcnet.org>
6
7   Copyright (C) 1998 - 2008 Pekka Riikonen
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation; version 2 of the License.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18 */
19
20 #include "silcruntime.h"
21
22 #if defined(HAVE_EPOLL_WAIT)
23 #include <sys/epoll.h>
24 #elif defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE)
25 #include <poll.h>
26 #endif
27
28 const SilcScheduleOps schedule_ops;
29
30 /* Internal context. */
31 typedef struct {
32 #if defined(HAVE_EPOLL_WAIT)
33   struct epoll_event *fds;
34   SilcUInt32 fds_count;
35   int epfd;
36 #elif defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE)
37   struct rlimit nofile;
38   struct pollfd *fds;
39   SilcUInt32 fds_count;
40 #endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */
41   void *app_context;
42   int wakeup_pipe[2];
43   SilcTask wakeup_task;
44   sigset_t signals;
45   sigset_t signals_blocked;
46 } *SilcUnixScheduler;
47
48 typedef struct {
49   SilcUInt32 sig;
50   SilcTaskCallback callback;
51   void *context;
52   SilcBool call;
53   SilcSchedule schedule;
54 } SilcUnixSignal;
55
56 #define SIGNAL_COUNT 32
57
58 #if defined(HAVE_EPOLL_WAIT)
59
60 /* Linux's fast epoll system (level triggered) */
61
62 int silc_epoll(SilcSchedule schedule, void *context)
63 {
64   SilcUnixScheduler internal = context;
65   SilcTaskFd task;
66   struct epoll_event *fds = internal->fds;
67   SilcUInt32 fds_count = internal->fds_count;
68   int ret, i, timeout = -1;
69
70   /* Allocate larger fd table if needed */
71   i = silc_hash_table_count(schedule->fd_queue);
72   if (i > fds_count) {
73     fds = silc_realloc(internal->fds, sizeof(*internal->fds) *
74                        (fds_count + (i / 2)));
75     if (silc_likely(fds)) {
76       internal->fds = fds;
77       internal->fds_count = fds_count = fds_count + (i / 2);
78     }
79   }
80
81   if (schedule->has_timeout)
82     timeout = ((schedule->timeout.tv_sec * 1000) +
83                (schedule->timeout.tv_usec / 1000));
84
85   SILC_SCHEDULE_UNLOCK(schedule);
86   ret = epoll_wait(internal->epfd, fds, fds_count, timeout);
87   SILC_SCHEDULE_LOCK(schedule);
88   if (ret <= 0)
89     return ret;
90
91   silc_list_init(schedule->fd_dispatch, struct SilcTaskStruct, next);
92
93   for (i = 0; i < ret; i++) {
94     task = fds[i].data.ptr;
95     task->revents = 0;
96     if (!task->header.valid || !task->events) {
97       epoll_ctl(internal->epfd, EPOLL_CTL_DEL, task->fd, &fds[i]);
98       continue;
99     }
100     if (fds[i].events & (EPOLLIN | EPOLLPRI | EPOLLHUP | EPOLLERR))
101       task->revents |= SILC_TASK_READ;
102     if (fds[i].events & EPOLLOUT)
103       task->revents |= SILC_TASK_WRITE;
104     silc_list_add(schedule->fd_dispatch, task);
105   }
106
107   return ret;
108 }
109
110 #elif defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE)
111
112 /* Calls normal poll() system call. */
113
114 int silc_poll(SilcSchedule schedule, void *context)
115 {
116   SilcUnixScheduler internal = context;
117   SilcHashTableList htl;
118   SilcTaskFd task;
119   struct pollfd *fds = internal->fds;
120   SilcUInt32 fds_count = internal->fds_count;
121   int fd, ret, i = 0, timeout = -1;
122   void *fdp;
123
124   silc_hash_table_list(schedule->fd_queue, &htl);
125   while (silc_hash_table_get(&htl, &fdp, (void *)&task)) {
126     if (!task->events)
127       continue;
128     fd = SILC_PTR_TO_32(fdp);
129
130     /* Allocate larger fd table if needed */
131     if (i >= fds_count) {
132       struct rlimit nofile;
133
134       fds = silc_realloc(internal->fds, sizeof(*internal->fds) *
135                          (fds_count + (fds_count / 2)));
136       if (silc_unlikely(!fds))
137         break;
138       internal->fds = fds;
139       internal->fds_count = fds_count = fds_count + (fds_count / 2);
140       internal->nofile.rlim_cur = fds_count;
141       if (fds_count > internal->nofile.rlim_max)
142         internal->nofile.rlim_max = fds_count;
143       if (setrlimit(RLIMIT_NOFILE, &nofile) < 0)
144         break;
145     }
146
147     fds[i].fd = fd;
148     fds[i].events = 0;
149     task->revents = fds[i].revents = 0;
150
151     if (task->events & SILC_TASK_READ)
152       fds[i].events |= (POLLIN | POLLPRI);
153     if (task->events & SILC_TASK_WRITE)
154       fds[i].events |= POLLOUT;
155     i++;
156   }
157   silc_hash_table_list_reset(&htl);
158   silc_list_init(schedule->fd_dispatch, struct SilcTaskStruct, next);
159
160   if (schedule->has_timeout)
161     timeout = ((schedule->timeout.tv_sec * 1000) +
162                (schedule->timeout.tv_usec / 1000));
163
164   fds_count = i;
165   SILC_SCHEDULE_UNLOCK(schedule);
166   ret = poll(fds, fds_count, timeout);
167   SILC_SCHEDULE_LOCK(schedule);
168   if (ret <= 0)
169     return ret;
170
171   for (i = 0; i < fds_count; i++) {
172     if (!fds[i].revents)
173       continue;
174     if (!silc_hash_table_find(schedule->fd_queue, SILC_32_TO_PTR(fds[i].fd),
175                               NULL, (void *)&task))
176       continue;
177     if (!task->header.valid || !task->events)
178       continue;
179
180     fd = fds[i].revents;
181     if (fd & (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLNVAL))
182       task->revents |= SILC_TASK_READ;
183     if (fd & POLLOUT)
184       task->revents |= SILC_TASK_WRITE;
185     silc_list_add(schedule->fd_dispatch, task);
186   }
187
188   return ret;
189 }
190
191 #else
192
193 /* Calls normal select() system call. */
194
195 int silc_select(SilcSchedule schedule, void *context)
196 {
197   SilcHashTableList htl;
198   SilcTaskFd task;
199   fd_set in, out;
200   int fd, max_fd = 0, ret;
201   void *fdp;
202
203   FD_ZERO(&in);
204   FD_ZERO(&out);
205
206   silc_hash_table_list(schedule->fd_queue, &htl);
207   while (silc_hash_table_get(&htl, &fdp, (void *)&task)) {
208     if (!task->events)
209       continue;
210     fd = SILC_PTR_TO_32(fdp);
211
212 #ifdef FD_SETSIZE
213     if (fd >= FD_SETSIZE)
214       break;
215 #endif /* FD_SETSIZE */
216
217     if (fd > max_fd)
218       max_fd = fd;
219
220     if (task->events & SILC_TASK_READ)
221       FD_SET(fd, &in);
222     if (task->events & SILC_TASK_WRITE)
223       FD_SET(fd, &out);
224
225     task->revents = 0;
226   }
227   silc_hash_table_list_reset(&htl);
228   silc_list_init(schedule->fd_dispatch, struct SilcTaskStruct, next);
229
230   SILC_SCHEDULE_UNLOCK(schedule);
231   ret = select(max_fd + 1, &in, &out, NULL, (schedule->has_timeout ?
232                                              &schedule->timeout : NULL));
233   SILC_SCHEDULE_LOCK(schedule);
234   if (ret <= 0)
235     return ret;
236
237   silc_hash_table_list(schedule->fd_queue, &htl);
238   while (silc_hash_table_get(&htl, &fdp, (void *)&task)) {
239     if (!task->header.valid || !task->events)
240       continue;
241     fd = SILC_PTR_TO_32(fdp);
242
243 #ifdef FD_SETSIZE
244     if (fd >= FD_SETSIZE)
245       break;
246 #endif /* FD_SETSIZE */
247
248     if (FD_ISSET(fd, &in))
249       task->revents |= SILC_TASK_READ;
250     if (FD_ISSET(fd, &out))
251       task->revents |= SILC_TASK_WRITE;
252     silc_list_add(schedule->fd_dispatch, task);
253   }
254   silc_hash_table_list_reset(&htl);
255
256   return ret;
257 }
258
259 #endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */
260
261 /* Schedule `task' with events `event_mask'. Zero `event_mask' unschedules. */
262
263 SilcBool silc_schedule_internal_schedule_fd(SilcSchedule schedule,
264                                             void *context,
265                                             SilcTaskFd task,
266                                             SilcTaskEvent event_mask)
267 {
268 #if defined(HAVE_EPOLL_WAIT)
269   SilcUnixScheduler internal = (SilcUnixScheduler)context;
270   struct epoll_event event;
271
272   if (!internal)
273     return TRUE;
274
275   SILC_LOG_DEBUG(("Scheduling fd %lu, mask %x", task->fd, event_mask));
276
277   memset(&event, 0, sizeof(event));
278   if (event_mask & SILC_TASK_READ)
279     event.events |= (EPOLLIN | EPOLLPRI);
280   if (event_mask & SILC_TASK_WRITE)
281     event.events |= EPOLLOUT;
282
283   /* Zero mask unschedules task */
284   if (silc_unlikely(!event.events)) {
285     if (epoll_ctl(internal->epfd, EPOLL_CTL_DEL, task->fd, &event)) {
286       SILC_LOG_DEBUG(("epoll_ctl (DEL): %s", strerror(errno)));
287       return FALSE;
288     }
289     task->scheduled = FALSE;
290     return TRUE;
291   }
292
293   /* Schedule the task */
294   if (silc_unlikely(!task->scheduled)) {
295     event.data.ptr = task;
296     if (epoll_ctl(internal->epfd, EPOLL_CTL_ADD, task->fd, &event)) {
297       SILC_LOG_DEBUG(("epoll_ctl (ADD): %s", strerror(errno)));
298       return FALSE;
299     }
300     task->scheduled = TRUE;
301     return TRUE;
302   }
303
304   /* Schedule for specific mask */
305   event.data.ptr = task;
306   if (epoll_ctl(internal->epfd, EPOLL_CTL_MOD, task->fd, &event)) {
307     SILC_LOG_DEBUG(("epoll_ctl (MOD): %s", strerror(errno)));
308     return FALSE;
309   }
310 #endif /* HAVE_EPOLL_WAIT */
311   return TRUE;
312 }
313
314 #ifdef SILC_THREADS
315
316 SILC_TASK_CALLBACK(silc_schedule_wakeup_cb)
317 {
318   SilcUnixScheduler internal = (SilcUnixScheduler)context;
319   unsigned char c;
320
321   SILC_LOG_DEBUG(("Wokeup"));
322
323   (void)read(internal->wakeup_pipe[0], &c, 1);
324 }
325
326 SILC_TASK_CALLBACK(silc_schedule_wakeup_init)
327 {
328   SilcUnixScheduler internal = schedule->internal;
329
330   internal->wakeup_task =
331     silc_schedule_task_add(schedule, internal->wakeup_pipe[0],
332                            silc_schedule_wakeup_cb, internal,
333                            0, 0, SILC_TASK_FD);
334   if (!internal->wakeup_task) {
335     SILC_LOG_WARNING(("Could not add a wakeup task, threads won't work"));
336     close(internal->wakeup_pipe[0]);
337     return;
338   }
339   silc_schedule_internal_schedule_fd(schedule, internal,
340                                      (SilcTaskFd)internal->wakeup_task,
341                                      SILC_TASK_READ);
342 }
343 #endif /* SILC_THREADS */
344
345 /* Initializes the platform specific scheduler.  This for example initializes
346    the wakeup mechanism of the scheduler.  In multi-threaded environment
347    the scheduler needs to be woken up when tasks are added or removed from
348    the task queues.  Returns context to the platform specific scheduler. */
349
350 void *silc_schedule_internal_init(SilcSchedule schedule,
351                                   void *app_context)
352 {
353   SilcUnixScheduler internal;
354   SilcUnixSignal *signal_call;
355   int i;
356
357   internal = silc_scalloc(schedule->stack, 1, sizeof(*internal));
358   if (!internal)
359     return NULL;
360
361 #if defined(HAVE_EPOLL_WAIT)
362   internal->epfd = epoll_create(4);
363   if (internal->epfd < 0) {
364     SILC_LOG_ERROR(("epoll_create() failed: %s", strerror(errno)));
365     return NULL;
366   }
367   internal->fds = silc_calloc(4, sizeof(*internal->fds));
368   if (!internal->fds) {
369     close(internal->epfd);
370     return NULL;
371   }
372   internal->fds_count = 4;
373 #elif defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE)
374   getrlimit(RLIMIT_NOFILE, &internal->nofile);
375
376   if (schedule->max_tasks > 0) {
377     internal->nofile.rlim_cur = schedule->max_tasks;
378     if (schedule->max_tasks > internal->nofile.rlim_max)
379       internal->nofile.rlim_max = schedule->max_tasks;
380     setrlimit(RLIMIT_NOFILE, &internal->nofile);
381     getrlimit(RLIMIT_NOFILE, &internal->nofile);
382     schedule->max_tasks = internal->nofile.rlim_max;
383   }
384
385   internal->fds = silc_calloc(internal->nofile.rlim_cur,
386                               sizeof(*internal->fds));
387   if (!internal->fds)
388     return NULL;
389   internal->fds_count = internal->nofile.rlim_cur;
390 #endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */
391
392   sigemptyset(&internal->signals);
393
394 #ifdef SILC_THREADS
395   if (pipe(internal->wakeup_pipe)) {
396     SILC_LOG_ERROR(("pipe() fails: %s", strerror(errno)));
397     return NULL;
398   }
399
400   silc_schedule_task_add_timeout(schedule, silc_schedule_wakeup_init,
401                                  internal, 0, 0);
402 #endif /* SILC_THREADS */
403
404   internal->app_context = app_context;
405
406   signal_call = silc_global_get_var("srtsignals", TRUE);
407   if (!signal_call)
408     signal_call = silc_global_set_var("srtsignals",
409                                       sizeof(*signal_call) * SIGNAL_COUNT,
410                                       NULL, TRUE);
411   if (signal_call) {
412     for (i = 0; i < SIGNAL_COUNT; i++) {
413       signal_call[i].sig = 0;
414       signal_call[i].call = FALSE;
415       signal_call[i].schedule = schedule;
416     }
417   }
418
419   return (void *)internal;
420 }
421
422 void silc_schedule_internal_signals_block(SilcSchedule schedule,
423                                           void *context);
424 void silc_schedule_internal_signals_unblock(SilcSchedule schedule,
425                                             void *context);
426
427 /* Uninitializes the platform specific scheduler context. */
428
429 void silc_schedule_internal_uninit(SilcSchedule schedule, void *context)
430 {
431   SilcUnixScheduler internal = (SilcUnixScheduler)context;
432
433   if (!internal)
434     return;
435
436 #ifdef SILC_THREADS
437   close(internal->wakeup_pipe[0]);
438   close(internal->wakeup_pipe[1]);
439 #endif
440
441 #if defined(HAVE_EPOLL_WAIT)
442   close(internal->epfd);
443   silc_free(internal->fds);
444 #elif defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE)
445   silc_free(internal->fds);
446 #endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */
447
448   silc_global_del_var("srtsignals", TRUE);
449 }
450
451 /* Wakes up the scheduler */
452
453 void silc_schedule_internal_wakeup(SilcSchedule schedule, void *context)
454 {
455 #ifdef SILC_THREADS
456   SilcUnixScheduler internal = (SilcUnixScheduler)context;
457
458   if (!internal || !internal->wakeup_task)
459     return;
460
461   SILC_LOG_DEBUG(("Wakeup"));
462
463   (void)write(internal->wakeup_pipe[1], "!", 1);
464 #endif
465 }
466
467 /* Signal handler */
468
469 static void silc_schedule_internal_sighandler(int signal)
470 {
471   int i;
472   SilcUnixSignal *signal_call = silc_global_get_var("srtsignals", TRUE);
473
474   if (!signal_call)
475     return;
476
477   SILC_LOG_DEBUG(("Start"));
478
479   for (i = 0; i < SIGNAL_COUNT; i++) {
480     if (signal_call[i].sig == signal) {
481       signal_call[i].call = TRUE;
482       signal_call[i].schedule->signal_tasks = TRUE;
483       SILC_LOG_DEBUG(("Scheduling signal %d to be called",
484                       signal_call[i].sig));
485       break;
486     }
487   }
488 }
489
490 void silc_schedule_internal_signal_register(SilcSchedule schedule,
491                                             void *context,
492                                             SilcUInt32 sig,
493                                             SilcTaskCallback callback,
494                                             void *callback_context)
495 {
496   SilcUnixScheduler internal = (SilcUnixScheduler)context;
497   SilcUnixSignal *signal_call = silc_global_get_var("srtsignals", TRUE);
498   int i;
499
500   if (!internal || !signal_call)
501     return;
502
503   SILC_LOG_DEBUG(("Registering signal %d", sig));
504
505   silc_schedule_internal_signals_block(schedule, context);
506
507   for (i = 0; i < SIGNAL_COUNT; i++) {
508     if (!signal_call[i].sig) {
509       signal_call[i].sig = sig;
510       signal_call[i].callback = callback;
511       signal_call[i].context = callback_context;
512       signal_call[i].schedule = schedule;
513       signal_call[i].call = FALSE;
514       signal(sig, silc_schedule_internal_sighandler);
515       break;
516     }
517   }
518
519   silc_schedule_internal_signals_unblock(schedule, context);
520   sigaddset(&internal->signals, sig);
521 }
522
523 void silc_schedule_internal_signal_unregister(SilcSchedule schedule,
524                                               void *context,
525                                               SilcUInt32 sig)
526 {
527   SilcUnixScheduler internal = (SilcUnixScheduler)context;
528   SilcUnixSignal *signal_call = silc_global_get_var("srtsignals", TRUE);
529   int i;
530
531   if (!internal || !signal_call)
532     return;
533
534   SILC_LOG_DEBUG(("Unregistering signal %d", sig));
535
536   silc_schedule_internal_signals_block(schedule, context);
537
538   for (i = 0; i < SIGNAL_COUNT; i++) {
539     if (signal_call[i].sig == sig) {
540       signal_call[i].sig = 0;
541       signal_call[i].callback = NULL;
542       signal_call[i].context = NULL;
543       signal_call[i].schedule = NULL;
544       signal_call[i].call = FALSE;
545       signal(sig, SIG_DFL);
546     }
547   }
548
549   silc_schedule_internal_signals_unblock(schedule, context);
550   sigdelset(&internal->signals, sig);
551 }
552
553 /* Call all signals */
554
555 void silc_schedule_internal_signals_call(SilcSchedule schedule, void *context)
556 {
557   SilcUnixScheduler internal = (SilcUnixScheduler)context;
558   SilcUnixSignal *signal_call = silc_global_get_var("srtsignals", TRUE);
559   int i;
560
561   SILC_LOG_DEBUG(("Start"));
562
563   if (!internal || !signal_call)
564     return;
565
566   silc_schedule_internal_signals_block(schedule, context);
567
568   for (i = 0; i < SIGNAL_COUNT; i++) {
569     if (signal_call[i].call &&
570         signal_call[i].callback) {
571       SILC_LOG_DEBUG(("Calling signal %d callback",
572                       signal_call[i].sig));
573       silc_schedule_internal_signals_unblock(schedule, context);
574       signal_call[i].callback(schedule, internal->app_context,
575                               SILC_TASK_INTERRUPT,
576                               signal_call[i].sig,
577                               signal_call[i].context);
578       signal_call[i].call = FALSE;
579       silc_schedule_internal_signals_block(schedule, context);
580     }
581   }
582
583   silc_schedule_internal_signals_unblock(schedule, context);
584 }
585
586 /* Block registered signals in scheduler. */
587
588 void silc_schedule_internal_signals_block(SilcSchedule schedule, void *context)
589 {
590   SilcUnixScheduler internal = (SilcUnixScheduler)context;
591
592   if (!internal)
593     return;
594
595   sigprocmask(SIG_BLOCK, &internal->signals, &internal->signals_blocked);
596 }
597
598 /* Unblock registered signals in schedule. */
599
600 void silc_schedule_internal_signals_unblock(SilcSchedule schedule,
601                                             void *context)
602 {
603   SilcUnixScheduler internal = (SilcUnixScheduler)context;
604
605   if (!internal)
606     return;
607
608   sigprocmask(SIG_SETMASK, &internal->signals_blocked, NULL);
609 }
610
611 const SilcScheduleOps schedule_ops =
612 {
613   silc_schedule_internal_init,
614   silc_schedule_internal_uninit,
615 #if defined(HAVE_EPOLL_WAIT)
616   silc_epoll,
617 #elif defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE)
618   silc_poll,
619 #else
620   silc_select,
621 #endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */
622   silc_schedule_internal_schedule_fd,
623   silc_schedule_internal_wakeup,
624   silc_schedule_internal_signal_register,
625   silc_schedule_internal_signal_unregister,
626   silc_schedule_internal_signals_call,
627   silc_schedule_internal_signals_block,
628   silc_schedule_internal_signals_unblock,
629 };