diff options
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/modules/ngx_aio_module.c | 2 | ||||
| -rw-r--r-- | src/event/modules/ngx_kqueue_module.c | 2 | ||||
| -rw-r--r-- | src/event/ngx_event.h | 88 | ||||
| -rw-r--r-- | src/event/ngx_event_accept.c | 6 | ||||
| -rw-r--r-- | src/event/ngx_event_connect.c | 14 | ||||
| -rw-r--r-- | src/event/ngx_event_pipe.c | 26 | ||||
| -rw-r--r-- | src/event/ngx_event_pipe.h | 4 |
7 files changed, 84 insertions, 58 deletions
diff --git a/src/event/modules/ngx_aio_module.c b/src/event/modules/ngx_aio_module.c index 4b89c2a54..4ca53714b 100644 --- a/src/event/modules/ngx_aio_module.c +++ b/src/event/modules/ngx_aio_module.c @@ -19,7 +19,7 @@ static int ngx_aio_process_events(ngx_log_t *log); ngx_os_io_t ngx_os_aio = { ngx_aio_read, - NULL, + ngx_aio_read_chain, ngx_aio_write, ngx_aio_write_chain, NGX_HAVE_ZEROCOPY diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c index 61c724f27..6f6ed622b 100644 --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -437,7 +437,7 @@ static int ngx_kqueue_process_events(ngx_log_t *log) break; case EVFILT_AIO: - ev->ready = 1; + ev->aio_complete = 1; ev->active = 0; ev->event_handler(ev); diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h index f07020725..1e5a63a79 100644 --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -60,13 +60,19 @@ struct ngx_event_s { unsigned instance:1; /* - * event was passed or would be passed to a kernel; - * the posted aio operation. + * the event was passed or would be passed to a kernel; + * aio mode: 1 - the posted aio operation, + * 0 - the complete aio operation or no aio operation. */ unsigned active:1; - /* ready event; the complete aio operation */ + /* + * the ready event; + * in aio mode "ready" is always set - it makes things simple + * to learn whether the aio operation complete use aio_complete flag + */ unsigned ready:1; + unsigned aio_complete:1; unsigned eof:1; unsigned error:1; @@ -338,15 +344,8 @@ int ngx_event_post_acceptex(ngx_listening_t *ls, int n); -ngx_inline static int ngx_handle_read_event(ngx_event_t *rev) +ngx_inline static int ngx_handle_read_event(ngx_event_t *rev, int close) { - if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_EDGE_EVENT)) { - - /* aio, iocp, epoll */ - - return NGX_OK; - } - if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue */ @@ -359,27 +358,32 @@ ngx_inline static int ngx_handle_read_event(ngx_event_t *rev) } return NGX_OK; - } - - /* select, poll, /dev/poll */ - if (!rev->active && !rev->ready) { - if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) == NGX_ERROR) { - return NGX_ERROR; - } + } else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) { - return NGX_OK; - } + /* select, poll, /dev/poll */ - if (rev->active && (rev->ready || rev->eof)) { - if (ngx_del_event(rev, NGX_READ_EVENT, rev->eof ? NGX_CLOSE_EVENT : 0) + if (!rev->active && !rev->ready) { + if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) == NGX_ERROR) { - return NGX_ERROR; + return NGX_ERROR; + } + + return NGX_OK; } - return NGX_OK; + if (rev->active && (rev->ready || close)) { + if (ngx_del_event(rev, NGX_READ_EVENT, close ? NGX_CLOSE_EVENT : 0) + == NGX_ERROR) { + return NGX_ERROR; + } + + return NGX_OK; + } } + /* aio, iocp, epoll, rt signals */ + return NGX_OK; } @@ -411,13 +415,6 @@ ngx_inline static int ngx_handle_level_read_event(ngx_event_t *rev) ngx_inline static int ngx_handle_write_event(ngx_event_t *wev, int lowat) { - if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_EDGE_EVENT)) { - - /* aio, iocp, epoll */ - - return NGX_OK; - } - if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue */ @@ -437,26 +434,31 @@ ngx_inline static int ngx_handle_write_event(ngx_event_t *wev, int lowat) } return NGX_OK; - } - /* select, poll, /dev/poll */ + } else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) { - if (!wev->active && !wev->ready) { - if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) == NGX_ERROR) { - return NGX_ERROR; - } + /* select, poll, /dev/poll */ - return NGX_OK; - } + if (!wev->active && !wev->ready) { + if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) + == NGX_ERROR) { + return NGX_ERROR; + } - if (wev->active && wev->ready) { - if (ngx_del_event(wev, NGX_WRITE_EVENT, 0) == NGX_ERROR) { - return NGX_ERROR; + return NGX_OK; } - return NGX_OK; + if (wev->active && wev->ready) { + if (ngx_del_event(wev, NGX_WRITE_EVENT, 0) == NGX_ERROR) { + return NGX_ERROR; + } + + return NGX_OK; + } } + /* aio, iocp, epoll, rt signals */ + return NGX_OK; } diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c index e9469753d..a4f7436e9 100644 --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -172,9 +172,11 @@ ngx_log_debug(ev->log, "ADDR %s" _ ls->listening->addr_text.data); c->fd = s; c->unexpected_eof = 1; wev->write = 1; + wev->ready = 1; - if ((ngx_event_flags & NGX_USE_AIO_EVENT) == 0) { - wev->ready = 1; + if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_EDGE_EVENT)) { + /* aio, iocp, epoll */ + rev->ready = 1; } c->ctx = ls->ctx; diff --git a/src/event/ngx_event_connect.c b/src/event/ngx_event_connect.c index 16b440239..d47c5429b 100644 --- a/src/event/ngx_event_connect.c +++ b/src/event/ngx_event_connect.c @@ -202,7 +202,19 @@ ngx_log_debug(pc->log, "CONNECT: %s" _ peer->addr_port_text.data); } } - /* TODO: epoll, aio, iocp */ + if (ngx_event_flags & NGX_USE_AIO_EVENT) { + /* aio, iocp */ + rev->ready = 1; + +#if 1 + /* TODO: NGX_EINPROGRESS */ + + wev->ready = 1; + return NGX_OK; +#endif + } + + /* TODO: epoll */ if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue */ event = NGX_CLEAR_EVENT; diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c index 209f5cf23..d9f9abcec 100644 --- a/src/event/ngx_event_pipe.c +++ b/src/event/ngx_event_pipe.c @@ -19,6 +19,8 @@ static int ngx_event_pipe_drain_chains(ngx_event_pipe_t *p); int ngx_event_pipe(ngx_event_pipe_t *p, int do_write) { + ngx_event_t *rev, *wev; + for ( ;; ) { if (do_write) { if (ngx_event_pipe_write_to_downstream(p) == NGX_ABORT) { @@ -40,15 +42,26 @@ int ngx_event_pipe(ngx_event_pipe_t *p, int do_write) do_write = 1; } - if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) { + rev = p->upstream->read; + + if (ngx_handle_read_event(rev, (rev->eof || rev->error)) == NGX_ERROR) { return NGX_ABORT; } - if (ngx_handle_write_event(p->downstream->write, - /* TODO: lowat */ 0) == NGX_ERROR) { + if (rev->active) { + ngx_add_timer(rev, p->read_timeout); + } + + wev = p->downstream->write; + + if (ngx_handle_write_event(wev, p->send_lowat) == NGX_ERROR) { return NGX_ABORT; } + if (wev->active) { + ngx_add_timer(wev, p->send_timeout); + } + return NGX_OK; } @@ -112,7 +125,6 @@ int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->kq_errno, - /* TODO: ngx_readv_chain_n */ "readv() failed"); } @@ -213,8 +225,6 @@ int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) break; } - ngx_log_debug(p->log, "HUNK_FREE: %d" _ chain->hunk->num); - n = ngx_recv_chain(p->upstream, chain); ngx_log_debug(p->log, "recv_chain: %d" _ n); @@ -343,8 +353,6 @@ int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p) ngx_event_pipe_free_shadow_raw_hunk(&p->free_raw_hunks, cl->hunk); -ngx_log_debug(p->log, "HUNK OUT: %d %x" _ cl->hunk->num _ cl->hunk->type); - } else if (!p->cachable && p->in) { cl = p->in; @@ -356,8 +364,6 @@ ngx_log_debug(p->log, "HUNK OUT: %d %x" _ cl->hunk->num _ cl->hunk->type); p->in = p->in->next; -ngx_log_debug(p->log, "HUNK IN: %d" _ cl->hunk->num); - } else { break; } diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h index f93adba08..f59b60e7d 100644 --- a/src/event/ngx_event_pipe.h +++ b/src/event/ngx_event_pipe.h @@ -59,6 +59,10 @@ struct ngx_event_pipe_s { ngx_connection_t *upstream; ngx_connection_t *downstream; + ngx_msec_t read_timeout; + ngx_msec_t send_timeout; + ssize_t send_lowat; + ngx_pool_t *pool; ngx_log_t *log; |
