diff options
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/modules/ngx_kqueue_module.c | 11 | ||||
| -rw-r--r-- | src/event/ngx_event.c | 3 | ||||
| -rw-r--r-- | src/event/ngx_event.h | 5 | ||||
| -rw-r--r-- | src/event/ngx_event_accept.c | 5 | ||||
| -rw-r--r-- | src/event/ngx_event_posted.c | 80 | ||||
| -rw-r--r-- | src/event/ngx_event_posted.h | 9 |
6 files changed, 88 insertions, 25 deletions
diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c index 4a844e910..001d6a85e 100644 --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -582,14 +582,17 @@ static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle) ngx_mutex_unlock(ngx_posted_events_mutex); } - /* TODO: wake up worker thread */ - if (expire && delta) { ngx_event_expire_timers((ngx_msec_t) delta); } - if (!ngx_threaded) { - ngx_event_process_posted(cycle); + if (ngx_posted_events) { + if (ngx_threaded) { + ngx_cv_signal(ngx_posted_events_cv); + + } else { + ngx_event_process_posted(cycle); + } } return NGX_OK; diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c index 6d497e08c..58a5f0f70 100644 --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -265,6 +265,9 @@ static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) for (i = 0; i < cycle->connection_n; i++) { c[i].fd = (ngx_socket_t) -1; c[i].data = NULL; +#if (NGX_THREADS) + c[i].lock = 0; +#endif } cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * ecf->connections, diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h index 87b02e13d..8e30ccddf 100644 --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -154,6 +154,11 @@ struct ngx_event_s { #endif +#if (NGX_THREADS) + ngx_atomic_t *lock; +#endif + + #if 0 /* the threads support */ diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c index bdc9f4650..984570f80 100644 --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -278,6 +278,11 @@ void ngx_event_accept(ngx_event_t *ev) c->number = ngx_atomic_inc(ngx_connection_counter); +#if (NGX_THREADS) + rev->lock = &c->lock; + wev->lock = &c->lock; +#endif + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "accept: fd:%d c:%d", s, c->number); diff --git a/src/event/ngx_event_posted.c b/src/event/ngx_event_posted.c index 96475f366..478744341 100644 --- a/src/event/ngx_event_posted.c +++ b/src/event/ngx_event_posted.c @@ -5,8 +5,10 @@ ngx_thread_volatile ngx_event_t *ngx_posted_events; + #if (NGX_THREADS) ngx_mutex_t *ngx_posted_events_mutex; +ngx_cv_t *ngx_posted_events_cv; #endif @@ -55,26 +57,70 @@ void ngx_event_process_posted(ngx_cycle_t *cycle) #if (NGX_THREADS) -void ngx_event_thread_handler(ngx_event_t *ev) +ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle) { - if ((!ev->posted && !ev->active) - || (ev->use_instance && ev->instance != ev->returned_instance)) - { - /* - * the stale event from a file descriptor - * that was just closed in this iteration - */ - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0, - "kevent: stale event " PTR_FMT, ev); - return; - } + ngx_event_t *ev, **ep; - if (ev->posted) { - ev->posted = 0; - } + for ( ;; ) { + + ev = (ngx_event_t *) ngx_posted_events; + ep = (ngx_event_t **) &ngx_posted_events; + + for ( ;; ) { + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "posted event " PTR_FMT, ev); + + if (ev == NULL) { + ngx_mutex_unlock(ngx_posted_events_mutex); + return NGX_OK; + } + + if (ngx_trylock(ev->lock) == NGX_BUSY) { + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "posted event " PTR_FMT " is busy", ev); + + ep = &ev->next; + ev = ev->next; + continue; + } - ev->event_handler(ev); + *ep = ev->next; + + if ((!ev->posted && !ev->active) + || (ev->use_instance && ev->instance != ev->returned_instance)) + { + /* + * the stale event from a file descriptor + * that was just closed in this iteration + */ + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0, + "kevent: stale event " PTR_FMT, ev); + + ev = ev->next; + + continue; + } + + ngx_mutex_unlock(ngx_posted_events_mutex); + + if (ev->posted) { + ev->posted = 0; + } + + ev->event_handler(ev); + + *(ev->lock) = 0; + + if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + return NGX_ERROR; + } + + break; + } + } } #endif diff --git a/src/event/ngx_event_posted.h b/src/event/ngx_event_posted.h index 78f302e0e..b060cc6d9 100644 --- a/src/event/ngx_event_posted.h +++ b/src/event/ngx_event_posted.h @@ -14,14 +14,15 @@ void ngx_event_process_posted(ngx_cycle_t *cycle); -#if (NGX_THREADS) -void ngx_event_thread_handler(ngx_event_t *ev); -#endif - extern ngx_thread_volatile ngx_event_t *ngx_posted_events; + + #if (NGX_THREADS) +ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle); + extern ngx_mutex_t *ngx_posted_events_mutex; +extern ngx_cv_t *ngx_posted_events_cv; #endif |
