summaryrefslogtreecommitdiffhomepage
path: root/src/event/modules/ngx_rtsig_module.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/modules/ngx_rtsig_module.c')
-rw-r--r--src/event/modules/ngx_rtsig_module.c160
1 files changed, 98 insertions, 62 deletions
diff --git a/src/event/modules/ngx_rtsig_module.c b/src/event/modules/ngx_rtsig_module.c
index d0e7fee7d..9d1e1b9c3 100644
--- a/src/event/modules/ngx_rtsig_module.c
+++ b/src/event/modules/ngx_rtsig_module.c
@@ -158,9 +158,7 @@ static ngx_int_t ngx_rtsig_init(ngx_cycle_t *cycle)
ngx_event_actions = ngx_rtsig_module_ctx.actions;
- ngx_event_flags = NGX_USE_RTSIG_EVENT
- |NGX_HAVE_GREEDY_EVENT
- |NGX_HAVE_INSTANCE_EVENT;
+ ngx_event_flags = NGX_USE_RTSIG_EVENT|NGX_HAVE_GREEDY_EVENT;
return NGX_OK;
}
@@ -275,6 +273,7 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
ngx_msec_t timer;
ngx_err_t err;
siginfo_t si;
+ ngx_event_t *rev, *wev;
struct timeval tv;
struct timespec ts, *tp;
struct sigaction sa;
@@ -290,6 +289,19 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
for ( ;; ) {
timer = ngx_event_find_timer();
+#if (NGX_THREADS)
+
+ if (timer == NGX_TIMER_ERROR) {
+ return NGX_ERROR;
+ }
+
+ if (timer == NGX_TIMER_INFINITE || timer > 500) {
+ timer = 500;
+ break;
+ }
+
+#endif
+
if (timer != 0) {
break;
}
@@ -299,6 +311,10 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
ngx_event_expire_timers((ngx_msec_t)
(ngx_elapsed_msec - ngx_old_elapsed_msec));
+
+ if (ngx_posted_events && ngx_threaded) {
+ ngx_wakeup_worker_thread(cycle);
+ }
}
expire = 1;
@@ -340,7 +356,7 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"rtsig timer: %d", timer);
- /* Linux sigwaitinfo() is sigtimedwait() with the NULL timeout pointer */
+ /* Linux's sigwaitinfo() is sigtimedwait() with the NULL timeout pointer */
signo = sigtimedwait(&set, &si, tp);
@@ -400,14 +416,8 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
instance = signo - rtscf->signo;
- if (si.si_band & POLLIN) {
- c->read->returned_instance = instance;
- }
+ rev = c->read;
- if (si.si_band & POLLOUT) {
- c->write->returned_instance = instance;
- }
-
if (c->read->instance != instance) {
/*
@@ -424,47 +434,62 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
}
if (si.si_band & (POLLIN|POLLHUP|POLLERR)) {
- if (c->read->active) {
- c->read->ready = 1;
-
- if (!ngx_threaded && !ngx_accept_mutex_held) {
- c->read->event_handler(c->read);
+ if (rev->active) {
- } else if (c->read->accept) {
- if (ngx_accept_disabled <= 0) {
- c->read->event_handler(c->read);
- }
-
- } else {
+ if (ngx_threaded && !rev->accept) {
if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
ngx_accept_mutex_unlock();
return NGX_ERROR;
}
- ngx_post_event(c->read);
+ rev->posted_ready = 1;
+ ngx_post_event(rev);
ngx_mutex_unlock(ngx_posted_events_mutex);
+
+ } else {
+ rev->ready = 1;
+
+ if (!ngx_threaded && !ngx_accept_mutex_held) {
+ rev->event_handler(rev);
+
+ } else if (rev->accept) {
+ if (ngx_accept_disabled <= 0) {
+ rev->event_handler(rev);
+ }
+
+ } else {
+ ngx_post_event(rev);
+ }
}
}
}
- if (si.si_band & (POLLOUT|POLLHUP|POLLERR)) {
- if (c->write->active) {
- c->write->ready = 1;
-
- if (!ngx_threaded && !ngx_accept_mutex_held) {
- c->write->event_handler(c->write);
+ wev = c->write;
- } else {
+ if (si.si_band & (POLLOUT|POLLHUP|POLLERR)) {
+ if (wev->active) {
+ if (ngx_threaded) {
if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
ngx_accept_mutex_unlock();
return NGX_ERROR;
}
- ngx_post_event(c->write);
+ wev->posted_ready = 1;
+ ngx_post_event(wev);
ngx_mutex_unlock(ngx_posted_events_mutex);
+
+ } else {
+ wev->ready = 1;
+
+ if (!ngx_threaded && !ngx_accept_mutex_held) {
+ wev->event_handler(wev);
+
+ } else {
+ ngx_post_event(wev);
+ }
}
}
}
@@ -512,8 +537,13 @@ ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle)
ngx_event_expire_timers((ngx_msec_t) delta);
}
- if (!ngx_threaded) {
- ngx_event_process_posted(cycle);
+ if (ngx_posted_events) {
+ if (ngx_threaded) {
+ ngx_wakeup_worker_thread(cycle);
+
+ } else {
+ ngx_event_process_posted(cycle);
+ }
}
if (signo == -1) {
@@ -532,6 +562,7 @@ static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle)
size_t len;
ngx_int_t tested, n, i;
ngx_err_t err;
+ ngx_event_t *rev, *wev;
ngx_connection_t *c;
ngx_rtsig_conf_t *rtscf;
@@ -587,60 +618,60 @@ static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle)
cycle->log, 0,
"poll() failed while the overflow recover");
- if (err == NGX_EINTR) {
- continue;
+ if (err != NGX_EINTR) {
+ break;
}
}
-
- break;
}
if (ready <= 0) {
continue;
}
+ if (n) {
+ if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+ }
+
for (i = 0; i < n; i++) {
c = &cycle->connections[overflow_list[i].fd];
+ rev = c->read;
+
if (overflow_list[i].revents & (POLLIN|POLLERR|POLLHUP|POLLNVAL)) {
tested++;
- c->read->ready = 1;
- if (!ngx_threaded) {
- c->read->event_handler(c->read);
+ if (ngx_threaded) {
+ rev->posted_ready = 1;
+ ngx_post_event(rev);
} else {
- if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
- return NGX_ERROR;
- }
-
- ngx_post_event(c->read);
- c->read->returned_instance = c->read->instance;
-
- ngx_mutex_unlock(ngx_posted_events_mutex);
+ rev->ready = 1;
+ rev->event_handler(rev);
}
}
+ wev = c->write;
+
if (overflow_list[i].revents & (POLLOUT|POLLERR|POLLHUP|POLLNVAL)) {
tested++;
- c->write->ready = 1;
-
- if (!ngx_threaded) {
- c->write->event_handler(c->write);
-
- } else {
- if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
- return NGX_ERROR;
- }
- ngx_post_event(c->write);
- c->write->returned_instance = c->write->instance;
+ if (ngx_threaded) {
+ wev->posted_ready = 1;
+ ngx_post_event(wev);
- ngx_mutex_unlock(ngx_posted_events_mutex);
+ } else {
+ wev->ready = 1;
+ wev->event_handler(wev);
}
}
}
+ if (n) {
+ ngx_mutex_unlock(ngx_posted_events_mutex);
+ }
+
if (tested >= rtscf->overflow_test) {
/*
@@ -683,8 +714,13 @@ static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle)
}
}
- if (!ngx_threaded) {
- ngx_event_process_posted(cycle);
+ if (ngx_posted_events) {
+ if (ngx_threaded) {
+ ngx_wakeup_worker_thread(cycle);
+
+ } else {
+ ngx_event_process_posted(cycle);
+ }
}
ngx_log_error(NGX_LOG_INFO, cycle->log, 0,