diff options
Diffstat (limited to 'src/event/modules')
| -rw-r--r-- | src/event/modules/ngx_devpoll_module.c | 29 | ||||
| -rw-r--r-- | src/event/modules/ngx_epoll_module.c | 137 | ||||
| -rw-r--r-- | src/event/modules/ngx_iocp_module.c | 3 | ||||
| -rw-r--r-- | src/event/modules/ngx_kqueue_module.c | 5 | ||||
| -rw-r--r-- | src/event/modules/ngx_poll_module.c | 27 | ||||
| -rw-r--r-- | src/event/modules/ngx_rtsig_module.c | 160 | ||||
| -rw-r--r-- | src/event/modules/ngx_select_module.c | 7 |
7 files changed, 252 insertions, 116 deletions
diff --git a/src/event/modules/ngx_devpoll_module.c b/src/event/modules/ngx_devpoll_module.c index 6704deb0b..29da2fb05 100644 --- a/src/event/modules/ngx_devpoll_module.c +++ b/src/event/modules/ngx_devpoll_module.c @@ -322,6 +322,7 @@ int ngx_devpoll_process_events(ngx_cycle_t *cycle) ngx_msec_t timer; ngx_err_t err; ngx_cycle_t **old_cycle; + ngx_event_t *rev, *wev; ngx_connection_t *c; ngx_epoch_msec_t delta; struct dvpoll dvp; @@ -476,16 +477,16 @@ int ngx_devpoll_process_events(ngx_cycle_t *cycle) event_list[i].events, event_list[i].revents); } - if ((event_list[i].events & (POLLOUT|POLLERR|POLLHUP)) - && c->write->active) - { - c->write->ready = 1; + wev = c->write; + + if ((event_list[i].events & (POLLOUT|POLLERR|POLLHUP)) && wev->active) { + wev->ready = 1; if (!ngx_threaded && !ngx_accept_mutex_held) { - c->write->event_handler(c->write); + wev->event_handler(wev); } else { - ngx_post_event(c->write); + ngx_post_event(wev); } } @@ -495,21 +496,21 @@ int ngx_devpoll_process_events(ngx_cycle_t *cycle) * if the accept event is the last one. */ - if ((event_list[i].events & (POLLIN|POLLERR|POLLHUP)) - && c->read->active) - { - c->read->ready = 1; + rev = c->read; + + if ((event_list[i].events & (POLLIN|POLLERR|POLLHUP)) && rev->active) { + rev->ready = 1; if (!ngx_threaded && !ngx_accept_mutex_held) { - c->read->event_handler(c->read); + rev->event_handler(rev); - } else if (!c->read->accept) { - ngx_post_event(c->read); + } else if (!rev->accept) { + ngx_post_event(rev); } else if (ngx_accept_disabled <= 0) { ngx_mutex_unlock(ngx_posted_events_mutex); - c->read->event_handler(c->read); + c->read->event_handler(rev); if (ngx_accept_disabled > 0) { ngx_accept_mutex_unlock(); diff --git a/src/event/modules/ngx_epoll_module.c b/src/event/modules/ngx_epoll_module.c index 6f3e57351..fac28f158 100644 --- a/src/event/modules/ngx_epoll_module.c +++ b/src/event/modules/ngx_epoll_module.c @@ -75,7 +75,7 @@ static void ngx_epoll_done(ngx_cycle_t *cycle); static int ngx_epoll_add_event(ngx_event_t *ev, int event, u_int flags); static int ngx_epoll_del_event(ngx_event_t *ev, int event, u_int flags); static int ngx_epoll_add_connection(ngx_connection_t *c); -static int ngx_epoll_del_connection(ngx_connection_t *c); +static int ngx_epoll_del_connection(ngx_connection_t *c, u_int flags); static int ngx_epoll_process_events(ngx_cycle_t *cycle); static void *ngx_epoll_create_conf(ngx_cycle_t *cycle); @@ -111,8 +111,8 @@ ngx_event_module_t ngx_epoll_module_ctx = { ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ - NULL, /* add an connection */ - NULL, /* delete an connection */ + ngx_epoll_add_connection, /* add an connection */ + ngx_epoll_del_connection, /* delete an connection */ NULL, /* process the changes */ ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ @@ -124,9 +124,9 @@ ngx_module_t ngx_epoll_module = { NGX_MODULE, &ngx_epoll_module_ctx, /* module context */ ngx_epoll_commands, /* module directives */ - NGX_EVENT_MODULE, /* module type */ - NULL, /* init module */ - NULL /* init process */ + NGX_EVENT_MODULE, /* module type */ + NULL, /* init module */ + NULL /* init process */ }; @@ -174,7 +174,7 @@ static int ngx_epoll_init(ngx_cycle_t *cycle) ngx_event_flags = NGX_USE_LEVEL_EVENT #endif |NGX_HAVE_GREEDY_EVENT - |NGX_HAVE_INSTANCE_EVENT; + |NGX_USE_EPOLL_EVENT; return NGX_OK; } @@ -306,7 +306,6 @@ static int ngx_epoll_del_event(ngx_event_t *ev, int event, u_int flags) } -#if 0 static int ngx_epoll_add_connection(ngx_connection_t *c) { struct epoll_event ee; @@ -330,14 +329,41 @@ static int ngx_epoll_add_connection(ngx_connection_t *c) } -static int ngx_epoll_del_connection(ngx_connection_t *c) +static int ngx_epoll_del_connection(ngx_connection_t *c, u_int flags) { + int op; + struct epoll_event ee; + + /* + * when the file descriptor is closed the epoll automatically deletes + * it from its queue so we do not need to delete explicity the event + * before the closing the file descriptor + */ + + if (flags & NGX_CLOSE_EVENT) { + c->read->active = 0; + c->write->active = 0; + return NGX_OK; + } + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + "epoll del connection: fd:%d", c->fd); + + op = EPOLL_CTL_DEL; + ee.events = 0; + ee.data.ptr = NULL; + + if (epoll_ctl(ep, op, c->fd, &ee) == -1) { + ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno, + "epoll_ctl(%d, %d) failed", op, c->fd); + return NGX_ERROR; + } + c->read->active = 0; c->write->active = 0; return NGX_OK; } -#endif int ngx_epoll_process_events(ngx_cycle_t *cycle) @@ -349,6 +375,7 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) ngx_err_t err; ngx_log_t *log; ngx_msec_t timer; + ngx_event_t *rev, *wev; struct timeval tv; ngx_connection_t *c; ngx_epoch_msec_t delta; @@ -356,6 +383,19 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) for ( ;; ) { timer = ngx_event_find_timer(); +#if (NGX_THREADS) + + if (timer == NGX_TIMER_ERROR) { + return NGX_ERROR; + } + + if (timer == NGX_TIMER_INFINITE || timer > 500) { + timer = 500; + break; + } + +#endif + if (timer != 0) { break; } @@ -365,6 +405,10 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) ngx_event_expire_timers((ngx_msec_t) (ngx_elapsed_msec - ngx_old_elapsed_msec)); + + if (ngx_posted_events && ngx_threaded) { + ngx_wakeup_worker_thread(cycle); + } } /* NGX_TIMER_INFINITE == INFTIM */ @@ -438,12 +482,18 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) return NGX_ERROR; } - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; + if (events > 0) { + if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + ngx_accept_mutex_unlock(); + return NGX_ERROR; + } + + lock = 1; + + } else { + lock =0; } - lock = 1; log = cycle->log; for (i = 0; i < events; i++) { @@ -452,15 +502,9 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); - if (event_list[i].events & EPOLLIN) { - c->read->returned_instance = instance; - } - - if (event_list[i].events & EPOLLOUT) { - c->write->returned_instance = instance; - } + rev = c->read; - if (c->read->instance != instance) { + if (c->fd == -1 || rev->instance != instance) { /* * the stale event from a file descriptor @@ -492,16 +536,24 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) c->fd, event_list[i].events); } + wev = c->write; + if ((event_list[i].events & (EPOLLOUT|EPOLLERR|EPOLLHUP)) - && c->write->active) + && wev->active) { - c->write->ready = 1; - - if (!ngx_threaded && !ngx_accept_mutex_held) { - c->write->event_handler(c->write); + if (ngx_threaded) { + wev->posted_ready = 1; + ngx_post_event(wev); } else { - ngx_post_event(c->write); + wev->ready = 1; + + if (!ngx_accept_mutex_held) { + wev->event_handler(wev); + + } else { + ngx_post_event(wev); + } } } @@ -512,21 +564,29 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) */ if ((event_list[i].events & (EPOLLIN|EPOLLERR|EPOLLHUP)) - && c->read->active) + && rev->active) { - c->read->ready = 1; + if (ngx_threaded && !rev->accept) { + rev->posted_ready = 1; + + ngx_post_event(rev); + + continue; + } + + rev->ready = 1; if (!ngx_threaded && !ngx_accept_mutex_held) { - c->read->event_handler(c->read); + rev->event_handler(rev); - } else if (!c->read->accept) { - ngx_post_event(c->read); + } else if (!rev->accept) { + ngx_post_event(rev); } else if (ngx_accept_disabled <= 0) { ngx_mutex_unlock(ngx_posted_events_mutex); - c->read->event_handler(c->read); + rev->event_handler(rev); if (ngx_accept_disabled > 0) { ngx_accept_mutex_unlock(); @@ -560,8 +620,13 @@ int ngx_epoll_process_events(ngx_cycle_t *cycle) ngx_event_expire_timers((ngx_msec_t) delta); } - if (!ngx_threaded) { - ngx_event_process_posted(cycle); + if (ngx_posted_events) { + if (ngx_threaded) { + ngx_wakeup_worker_thread(cycle); + + } else { + ngx_event_process_posted(cycle); + } } return NGX_OK; diff --git a/src/event/modules/ngx_iocp_module.c b/src/event/modules/ngx_iocp_module.c index 3a93825e8..4506cbc56 100644 --- a/src/event/modules/ngx_iocp_module.c +++ b/src/event/modules/ngx_iocp_module.c @@ -60,6 +60,7 @@ ngx_event_module_t ngx_iocp_module_ctx = { NULL, /* disable an event */ NULL, /* add an connection */ ngx_iocp_del_connection, /* delete an connection */ + NULL, /* process the changes */ ngx_iocp_process_events, /* process the events */ ngx_iocp_init, /* init the events */ ngx_iocp_done /* done the events */ @@ -73,7 +74,7 @@ ngx_module_t ngx_iocp_module = { ngx_iocp_commands, /* module directives */ NGX_EVENT_MODULE, /* module type */ NULL, /* init module */ - NULL /* init child */ + NULL /* init process */ }; diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c index 2f722f781..99fffefee 100644 --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -421,6 +421,7 @@ static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle) timer = ngx_event_find_timer(); #if (NGX_THREADS) + if (timer == NGX_TIMER_ERROR) { return NGX_ERROR; } @@ -442,7 +443,9 @@ static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle) ngx_event_expire_timers((ngx_msec_t) (ngx_elapsed_msec - ngx_old_elapsed_msec)); - /* TODO: if ngx_threaded then wake up the worker thread */ + if (ngx_posted_events && ngx_threaded) { + ngx_wakeup_worker_thread(cycle); + } } ngx_old_elapsed_msec = ngx_elapsed_msec; diff --git a/src/event/modules/ngx_poll_module.c b/src/event/modules/ngx_poll_module.c index 2170d0e02..75172f1d7 100644 --- a/src/event/modules/ngx_poll_module.c +++ b/src/event/modules/ngx_poll_module.c @@ -14,6 +14,7 @@ static void ngx_poll_done(ngx_cycle_t *cycle); static ngx_int_t ngx_poll_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_poll_del_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_poll_process_events(ngx_cycle_t *cycle); +static char *ngx_poll_init_conf(ngx_cycle_t *cycle, void *conf); static struct pollfd *event_list; @@ -31,7 +32,7 @@ static ngx_str_t poll_name = ngx_string("poll"); ngx_event_module_t ngx_poll_module_ctx = { &poll_name, NULL, /* create configuration */ - NULL, /* init configuration */ + ngx_poll_init_conf, /* init configuration */ { ngx_poll_add_event, /* add an event */ @@ -577,3 +578,27 @@ static ngx_int_t ngx_poll_process_events(ngx_cycle_t *cycle) return nready; } + + +static char *ngx_poll_init_conf(ngx_cycle_t *cycle, void *conf) +{ + ngx_event_conf_t *ecf; + + ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module); + + if (ecf->use != ngx_poll_module.ctx_index) { + return NGX_CONF_OK; + } + +#if (NGX_THREADS) + + ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, + "poll() is not supported in the threaded mode"); + return NGX_CONF_ERROR; + +#else + + return NGX_CONF_OK; + +#endif +} diff --git a/src/event/modules/ngx_rtsig_module.c b/src/event/modules/ngx_rtsig_module.c index d0e7fee7d..9d1e1b9c3 100644 --- a/src/event/modules/ngx_rtsig_module.c +++ b/src/event/modules/ngx_rtsig_module.c @@ -158,9 +158,7 @@ static ngx_int_t ngx_rtsig_init(ngx_cycle_t *cycle) ngx_event_actions = ngx_rtsig_module_ctx.actions; - ngx_event_flags = NGX_USE_RTSIG_EVENT - |NGX_HAVE_GREEDY_EVENT - |NGX_HAVE_INSTANCE_EVENT; + ngx_event_flags = NGX_USE_RTSIG_EVENT|NGX_HAVE_GREEDY_EVENT; return NGX_OK; } @@ -275,6 +273,7 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle) ngx_msec_t timer; ngx_err_t err; siginfo_t si; + ngx_event_t *rev, *wev; struct timeval tv; struct timespec ts, *tp; struct sigaction sa; @@ -290,6 +289,19 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle) for ( ;; ) { timer = ngx_event_find_timer(); +#if (NGX_THREADS) + + if (timer == NGX_TIMER_ERROR) { + return NGX_ERROR; + } + + if (timer == NGX_TIMER_INFINITE || timer > 500) { + timer = 500; + break; + } + +#endif + if (timer != 0) { break; } @@ -299,6 +311,10 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle) ngx_event_expire_timers((ngx_msec_t) (ngx_elapsed_msec - ngx_old_elapsed_msec)); + + if (ngx_posted_events && ngx_threaded) { + ngx_wakeup_worker_thread(cycle); + } } expire = 1; @@ -340,7 +356,7 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle) ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "rtsig timer: %d", timer); - /* Linux sigwaitinfo() is sigtimedwait() with the NULL timeout pointer */ + /* Linux's sigwaitinfo() is sigtimedwait() with the NULL timeout pointer */ signo = sigtimedwait(&set, &si, tp); @@ -400,14 +416,8 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle) instance = signo - rtscf->signo; - if (si.si_band & POLLIN) { - c->read->returned_instance = instance; - } + rev = c->read; - if (si.si_band & POLLOUT) { - c->write->returned_instance = instance; - } - if (c->read->instance != instance) { /* @@ -424,47 +434,62 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle) } if (si.si_band & (POLLIN|POLLHUP|POLLERR)) { - if (c->read->active) { - c->read->ready = 1; - - if (!ngx_threaded && !ngx_accept_mutex_held) { - c->read->event_handler(c->read); + if (rev->active) { - } else if (c->read->accept) { - if (ngx_accept_disabled <= 0) { - c->read->event_handler(c->read); - } - - } else { + if (ngx_threaded && !rev->accept) { if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { ngx_accept_mutex_unlock(); return NGX_ERROR; } - ngx_post_event(c->read); + rev->posted_ready = 1; + ngx_post_event(rev); ngx_mutex_unlock(ngx_posted_events_mutex); + + } else { + rev->ready = 1; + + if (!ngx_threaded && !ngx_accept_mutex_held) { + rev->event_handler(rev); + + } else if (rev->accept) { + if (ngx_accept_disabled <= 0) { + rev->event_handler(rev); + } + + } else { + ngx_post_event(rev); + } } } } - if (si.si_band & (POLLOUT|POLLHUP|POLLERR)) { - if (c->write->active) { - c->write->ready = 1; - - if (!ngx_threaded && !ngx_accept_mutex_held) { - c->write->event_handler(c->write); + wev = c->write; - } else { + if (si.si_band & (POLLOUT|POLLHUP|POLLERR)) { + if (wev->active) { + if (ngx_threaded) { if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { ngx_accept_mutex_unlock(); return NGX_ERROR; } - ngx_post_event(c->write); + wev->posted_ready = 1; + ngx_post_event(wev); ngx_mutex_unlock(ngx_posted_events_mutex); + + } else { + wev->ready = 1; + + if (!ngx_threaded && !ngx_accept_mutex_held) { + wev->event_handler(wev); + + } else { + ngx_post_event(wev); + } } } } @@ -512,8 +537,13 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle) ngx_event_expire_timers((ngx_msec_t) delta); } - if (!ngx_threaded) { - ngx_event_process_posted(cycle); + if (ngx_posted_events) { + if (ngx_threaded) { + ngx_wakeup_worker_thread(cycle); + + } else { + ngx_event_process_posted(cycle); + } } if (signo == -1) { @@ -532,6 +562,7 @@ static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle) size_t len; ngx_int_t tested, n, i; ngx_err_t err; + ngx_event_t *rev, *wev; ngx_connection_t *c; ngx_rtsig_conf_t *rtscf; @@ -587,60 +618,60 @@ static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle) cycle->log, 0, "poll() failed while the overflow recover"); - if (err == NGX_EINTR) { - continue; + if (err != NGX_EINTR) { + break; } } - - break; } if (ready <= 0) { continue; } + if (n) { + if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + return NGX_ERROR; + } + } + for (i = 0; i < n; i++) { c = &cycle->connections[overflow_list[i].fd]; + rev = c->read; + if (overflow_list[i].revents & (POLLIN|POLLERR|POLLHUP|POLLNVAL)) { tested++; - c->read->ready = 1; - if (!ngx_threaded) { - c->read->event_handler(c->read); + if (ngx_threaded) { + rev->posted_ready = 1; + ngx_post_event(rev); } else { - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - return NGX_ERROR; - } - - ngx_post_event(c->read); - c->read->returned_instance = c->read->instance; - - ngx_mutex_unlock(ngx_posted_events_mutex); + rev->ready = 1; + rev->event_handler(rev); } } + wev = c->write; + if (overflow_list[i].revents & (POLLOUT|POLLERR|POLLHUP|POLLNVAL)) { tested++; - c->write->ready = 1; - - if (!ngx_threaded) { - c->write->event_handler(c->write); - - } else { - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - return NGX_ERROR; - } - ngx_post_event(c->write); - c->write->returned_instance = c->write->instance; + if (ngx_threaded) { + wev->posted_ready = 1; + ngx_post_event(wev); - ngx_mutex_unlock(ngx_posted_events_mutex); + } else { + wev->ready = 1; + wev->event_handler(wev); } } } + if (n) { + ngx_mutex_unlock(ngx_posted_events_mutex); + } + if (tested >= rtscf->overflow_test) { /* @@ -683,8 +714,13 @@ static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle) } } - if (!ngx_threaded) { - ngx_event_process_posted(cycle); + if (ngx_posted_events) { + if (ngx_threaded) { + ngx_wakeup_worker_thread(cycle); + + } else { + ngx_event_process_posted(cycle); + } } ngx_log_error(NGX_LOG_INFO, cycle->log, 0, diff --git a/src/event/modules/ngx_select_module.c b/src/event/modules/ngx_select_module.c index 1c30d82f9..e32f1dbe0 100644 --- a/src/event/modules/ngx_select_module.c +++ b/src/event/modules/ngx_select_module.c @@ -15,7 +15,6 @@ static void ngx_select_done(ngx_cycle_t *cycle); static ngx_int_t ngx_select_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_select_del_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_select_process_events(ngx_cycle_t *cycle); - static char *ngx_select_init_conf(ngx_cycle_t *cycle, void *conf); @@ -605,5 +604,11 @@ static char *ngx_select_init_conf(ngx_cycle_t *cycle, void *conf) } #endif +#if (NGX_THREADS) + ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, + "select() is not supported in the threaded mode"); + return NGX_CONF_ERROR; +#else return NGX_CONF_OK; +#endif } |
