X-Git-Url: http://git.silcnet.org/gitweb/?a=blobdiff_plain;f=lib%2Fsilcutil%2Funix%2Fsilcunixschedule.c;h=67e8d1d3f654c644ad66e6cfd8fab63da4b70dfc;hb=40f8443d8d3a6577336ee66d18e04d9ac4d956bb;hp=4c70827a5a290eaf4f3d93100b5a867885856139;hpb=262aba230e36a455a53fbde13f901a44472eae15;p=silc.git diff --git a/lib/silcutil/unix/silcunixschedule.c b/lib/silcutil/unix/silcunixschedule.c index 4c70827a..67e8d1d3 100644 --- a/lib/silcutil/unix/silcunixschedule.c +++ b/lib/silcutil/unix/silcunixschedule.c @@ -4,13 +4,12 @@ Author: Pekka Riikonen - Copyright (C) 1998 - 2001 Pekka Riikonen + Copyright (C) 1998 - 2005 Pekka Riikonen This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - + the Free Software Foundation; version 2 of the License. + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the @@ -19,95 +18,448 @@ */ /* $Id$ */ -#include "silcincludes.h" +#include "silc.h" -/* Calls normal select() system call. */ +#if defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE) +#include +#endif -int silc_select(int n, fd_set *readfds, fd_set *writefds, - fd_set *exceptfds, struct timeval *timeout) -{ - return select(n, readfds, writefds, exceptfds, timeout); -} +const SilcScheduleOps schedule_ops; -#ifdef SILC_THREADS +#define SIGNAL_COUNT 32 + +typedef struct { + SilcUInt32 signal; + SilcTaskCallback callback; + void *context; + SilcBool call; +} SilcUnixSignal; -/* Internal wakeup context. */ +/* Internal context. */ typedef struct { +#if defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE) + struct rlimit nofile; + struct pollfd *fds; + SilcUInt32 fds_count; +#endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */ + void *app_context; int wakeup_pipe[2]; SilcTask wakeup_task; -} *SilcUnixWakeup; + sigset_t signals; + sigset_t signals_blocked; + SilcUnixSignal signal_call[SIGNAL_COUNT]; +} *SilcUnixScheduler; + +#if defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE) + +/* Calls normal poll() system call. */ + +int silc_poll(SilcSchedule schedule, void *context) +{ + SilcUnixScheduler internal = context; + SilcHashTableList htl; + SilcTaskFd task; + struct pollfd *fds = internal->fds; + SilcUInt32 fds_count = internal->fds_count; + int fd, ret, i = 0, timeout = -1; + + silc_hash_table_list(schedule->fd_queue, &htl); + while (silc_hash_table_get(&htl, (void **)&fd, (void **)&task)) { + if (!task->events) + continue; + + /* Allocate larger fd table if needed */ + if (i >= fds_count) { + struct rlimit nofile; + + fds = silc_realloc(internal->fds, sizeof(*internal->fds) * + (fds_count + (fds_count / 2))); + if (!fds) + break; + internal->fds = fds; + internal->fds_count = fds_count = fds_count + (fds_count / 2); + internal->nofile.rlim_cur = fds_count; + if (fds_count > internal->nofile.rlim_max) + internal->nofile.rlim_max = fds_count; + if (setrlimit(RLIMIT_NOFILE, &nofile) < 0) + break; + } + + fds[i].fd = fd; + fds[i].events = 0; + task->revents = fds[i].revents = 0; + + if (task->events & SILC_TASK_READ) + fds[i].events |= (POLLIN | POLLPRI); + if (task->events & SILC_TASK_WRITE) + fds[i].events |= POLLOUT; + i++; + } + silc_hash_table_list_reset(&htl); + + if (schedule->has_timeout) + timeout = ((schedule->timeout.tv_sec * 1000) + + (schedule->timeout.tv_usec / 1000)); + + fds_count = i; + SILC_SCHEDULE_UNLOCK(schedule); + ret = poll(fds, fds_count, timeout); + SILC_SCHEDULE_LOCK(schedule); + if (ret <= 0) + return ret; + + for (i = 0; i < fds_count; i++) { + if (!fds[i].revents) + continue; + if (!silc_hash_table_find(schedule->fd_queue, SILC_32_TO_PTR(fds[i].fd), + NULL, (void **)&task)) + continue; + + fd = fds[i].revents; + if (fd & (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLNVAL)) + task->revents |= SILC_TASK_READ; + if (fd & POLLOUT) + task->revents |= SILC_TASK_WRITE; + } + + return ret; +} + +#else + +/* Calls normal select() system call. */ + +int silc_select(SilcSchedule schedule, void *context) +{ + SilcHashTableList htl; + SilcTaskFd task; + fd_set in, out; + int fd, max_fd = 0, ret; + + FD_ZERO(&in); + FD_ZERO(&out); + + silc_hash_table_list(schedule->fd_queue, &htl); + while (silc_hash_table_get(&htl, (void **)&fd, (void **)&task)) { + if (!task->events) + continue; + +#ifdef FD_SETSIZE + if (fd >= FD_SETSIZE) + break; +#endif /* FD_SETSIZE */ + + if (fd > max_fd) + max_fd = fd; + + if (task->events & SILC_TASK_READ) + FD_SET(fd, &in); + if (task->events & SILC_TASK_WRITE) + FD_SET(fd, &out); + + task->revents = 0; + } + silc_hash_table_list_reset(&htl); + + SILC_SCHEDULE_UNLOCK(schedule); + ret = select(max_fd + 1, &in, &out, NULL, (schedule->has_timeout ? + &schedule->timeout : NULL)); + SILC_SCHEDULE_LOCK(schedule); + if (ret <= 0) + return ret; + + silc_hash_table_list(schedule->fd_queue, &htl); + while (silc_hash_table_get(&htl, (void **)&fd, (void **)&task)) { + if (!task->events) + continue; + +#ifdef FD_SETSIZE + if (fd >= FD_SETSIZE) + break; +#endif /* FD_SETSIZE */ + + if (FD_ISSET(fd, &in)) + task->revents |= SILC_TASK_READ; + if (FD_ISSET(fd, &out)) + task->revents |= SILC_TASK_WRITE; + } + silc_hash_table_list_reset(&htl); + + return ret; +} + +#endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */ + +#ifdef SILC_THREADS SILC_TASK_CALLBACK(silc_schedule_wakeup_cb) { - SilcUnixWakeup wakeup = (SilcUnixWakeup)context; + SilcUnixScheduler internal = (SilcUnixScheduler)context; unsigned char c; - read(wakeup->wakeup_pipe[0], &c, 1); + SILC_LOG_DEBUG(("Wokeup")); + + read(internal->wakeup_pipe[0], &c, 1); } #endif /* SILC_THREADS */ -/* Initializes the wakeup of the scheduler. In multi-threaded environment +/* Initializes the platform specific scheduler. This for example initializes + the wakeup mechanism of the scheduler. In multi-threaded environment the scheduler needs to be wakenup when tasks are added or removed from - the task queues. This will initialize the wakeup for the scheduler. - Any tasks that needs to be registered must be registered to the `queue'. - It is quaranteed that the scheduler will automatically free any - registered tasks in this queue. This is system specific routine. */ + the task queues. Returns context to the platform specific scheduler. */ -void *silc_schedule_wakeup_init(void *queue) +void *silc_schedule_internal_init(SilcSchedule schedule, + void *app_context) { -#ifdef SILC_THREADS - SilcUnixWakeup wakeup; + SilcUnixScheduler internal; - wakeup = silc_calloc(1, sizeof(*wakeup)); - - if (pipe(wakeup->wakeup_pipe)) { - silc_free(wakeup); + internal = silc_calloc(1, sizeof(*internal)); + if (!internal) return NULL; + +#if defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE) + getrlimit(RLIMIT_NOFILE, &internal->nofile); + + if (schedule->max_tasks > 0) { + internal->nofile.rlim_cur = schedule->max_tasks; + if (schedule->max_tasks > internal->nofile.rlim_max) + internal->nofile.rlim_max = schedule->max_tasks; + setrlimit(RLIMIT_NOFILE, &internal->nofile); + getrlimit(RLIMIT_NOFILE, &internal->nofile); + schedule->max_tasks = internal->nofile.rlim_max; } - wakeup->wakeup_task = silc_task_register(queue, wakeup->wakeup_pipe[0], - silc_schedule_wakeup_cb, wakeup, - 0, 0, SILC_TASK_FD, - SILC_TASK_PRI_NORMAL); - if (!wakeup->wakeup_task) { - close(wakeup->wakeup_pipe[0]); - close(wakeup->wakeup_pipe[1]); - silc_free(wakeup); + internal->fds = silc_calloc(internal->nofile.rlim_cur, + sizeof(*internal->fds)); + if (!internal->fds) + return NULL; + internal->fds_count = internal->nofile.rlim_cur; +#endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */ + + sigemptyset(&internal->signals); + +#ifdef SILC_THREADS + if (pipe(internal->wakeup_pipe)) { + SILC_LOG_ERROR(("pipe() fails: %s", strerror(errno))); + silc_free(internal); return NULL; } - return (void *)wakeup; + internal->wakeup_task = + silc_schedule_task_add(schedule, internal->wakeup_pipe[0], + silc_schedule_wakeup_cb, internal, + 0, 0, SILC_TASK_FD); + if (!internal->wakeup_task) { + SILC_LOG_ERROR(("Could not add a wakeup task, threads won't work")); + close(internal->wakeup_pipe[0]); + close(internal->wakeup_pipe[1]); + silc_free(internal); + return NULL; + } #endif - return NULL; + + internal->app_context = app_context; + + return (void *)internal; } -/* Uninitializes the system specific wakeup. */ +void silc_schedule_internal_signals_block(SilcSchedule schedule, + void *context); +void silc_schedule_internal_signals_unblock(SilcSchedule schedule, + void *context); + +/* Uninitializes the platform specific scheduler context. */ -void silc_schedule_wakeup_uninit(void *context) +void silc_schedule_internal_uninit(SilcSchedule schedule, void *context) { -#ifdef SILC_THREADS - SilcUnixWakeup wakeup = (SilcUnixWakeup)context; + SilcUnixScheduler internal = (SilcUnixScheduler)context; - if (!wakeup) + if (!internal) return; - close(wakeup->wakeup_pipe[0]); - close(wakeup->wakeup_pipe[1]); - silc_free(wakeup); +#ifdef SILC_THREADS + close(internal->wakeup_pipe[0]); + close(internal->wakeup_pipe[1]); #endif + +#if defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE) + silc_free(internal->fds); +#endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */ + + silc_free(internal); } /* Wakes up the scheduler */ -void silc_schedule_wakeup_internal(void *context) +void silc_schedule_internal_wakeup(SilcSchedule schedule, void *context) { #ifdef SILC_THREADS - SilcUnixWakeup wakeup = (SilcUnixWakeup)context; + SilcUnixScheduler internal = (SilcUnixScheduler)context; - if (!wakeup) + if (!internal) return; - write(wakeup->wakeup_pipe[1], "!", 1); + SILC_LOG_DEBUG(("Wakeup")); + + write(internal->wakeup_pipe[1], "!", 1); #endif } + +void silc_schedule_internal_signal_register(SilcSchedule schedule, + void *context, + SilcUInt32 signal, + SilcTaskCallback callback, + void *callback_context) +{ + SilcUnixScheduler internal = (SilcUnixScheduler)context; + int i; + + if (!internal) + return; + + SILC_LOG_DEBUG(("Registering signal %d", signal)); + + silc_schedule_internal_signals_block(schedule, context); + + for (i = 0; i < SIGNAL_COUNT; i++) { + if (!internal->signal_call[i].signal) { + internal->signal_call[i].signal = signal; + internal->signal_call[i].callback = callback; + internal->signal_call[i].context = callback_context; + internal->signal_call[i].call = FALSE; + break; + } + } + + silc_schedule_internal_signals_unblock(schedule, context); + sigaddset(&internal->signals, signal); +} + +void silc_schedule_internal_signal_unregister(SilcSchedule schedule, + void *context, + SilcUInt32 signal, + SilcTaskCallback callback, + void *callback_context) +{ + SilcUnixScheduler internal = (SilcUnixScheduler)context; + int i; + + if (!internal) + return; + + SILC_LOG_DEBUG(("Unregistering signal %d", signal)); + + silc_schedule_internal_signals_block(schedule, context); + + for (i = 0; i < SIGNAL_COUNT; i++) { + if (internal->signal_call[i].signal == signal && + internal->signal_call[i].callback == callback && + internal->signal_call[i].context == callback_context) { + internal->signal_call[i].signal = 0; + internal->signal_call[i].callback = NULL; + internal->signal_call[i].context = NULL; + internal->signal_call[i].call = FALSE; + } + } + + silc_schedule_internal_signals_unblock(schedule, context); + sigdelset(&internal->signals, signal); +} + +/* Mark signal to be called later. */ + +void silc_schedule_internal_signal_call(SilcSchedule schedule, + void *context, SilcUInt32 signal) +{ + SilcUnixScheduler internal = (SilcUnixScheduler)context; + int i; + + if (!internal) + return; + + silc_schedule_internal_signals_block(schedule, context); + + for (i = 0; i < SIGNAL_COUNT; i++) { + if (internal->signal_call[i].signal == signal) { + internal->signal_call[i].call = TRUE; + SILC_LOG_DEBUG(("Scheduling signal %d to be called", + internal->signal_call[i].signal)); + } + } + + silc_schedule_internal_signals_unblock(schedule, context); +} + +/* Call all signals */ + +void silc_schedule_internal_signals_call(SilcSchedule schedule, void *context) +{ + SilcUnixScheduler internal = (SilcUnixScheduler)context; + int i; + + SILC_LOG_DEBUG(("Start")); + + if (!internal) + return; + + silc_schedule_internal_signals_block(schedule, context); + + for (i = 0; i < SIGNAL_COUNT; i++) { + if (internal->signal_call[i].call && + internal->signal_call[i].callback) { + SILC_LOG_DEBUG(("Calling signal %d callback", + internal->signal_call[i].signal)); + internal->signal_call[i].callback(schedule, internal->app_context, + SILC_TASK_INTERRUPT, + internal->signal_call[i].signal, + internal->signal_call[i].context); + internal->signal_call[i].call = FALSE; + } + } + + silc_schedule_internal_signals_unblock(schedule, context); +} + +/* Block registered signals in scheduler. */ + +void silc_schedule_internal_signals_block(SilcSchedule schedule, void *context) +{ + SilcUnixScheduler internal = (SilcUnixScheduler)context; + + if (!internal) + return; + + sigprocmask(SIG_BLOCK, &internal->signals, &internal->signals_blocked); +} + +/* Unblock registered signals in schedule. */ + +void silc_schedule_internal_signals_unblock(SilcSchedule schedule, + void *context) +{ + SilcUnixScheduler internal = (SilcUnixScheduler)context; + + if (!internal) + return; + + sigprocmask(SIG_SETMASK, &internal->signals_blocked, NULL); +} + +const SilcScheduleOps schedule_ops = +{ + silc_schedule_internal_init, + silc_schedule_internal_uninit, +#if defined(HAVE_POLL) && defined(HAVE_SETRLIMIT) && defined(RLIMIT_NOFILE) + silc_poll, +#else + silc_select, +#endif /* HAVE_POLL && HAVE_SETRLIMIT && RLIMIT_NOFILE */ + silc_schedule_internal_wakeup, + silc_schedule_internal_signal_register, + silc_schedule_internal_signal_unregister, + silc_schedule_internal_signal_call, + silc_schedule_internal_signals_call, + silc_schedule_internal_signals_block, + silc_schedule_internal_signals_unblock, +};