summaryrefslogtreecommitdiffhomepage
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/modules/ngx_aio_module.c2
-rw-r--r--src/event/modules/ngx_kqueue_module.c2
-rw-r--r--src/event/ngx_event.h88
-rw-r--r--src/event/ngx_event_accept.c6
-rw-r--r--src/event/ngx_event_connect.c14
-rw-r--r--src/event/ngx_event_pipe.c26
-rw-r--r--src/event/ngx_event_pipe.h4
7 files changed, 84 insertions, 58 deletions
diff --git a/src/event/modules/ngx_aio_module.c b/src/event/modules/ngx_aio_module.c
index 4b89c2a54..4ca53714b 100644
--- a/src/event/modules/ngx_aio_module.c
+++ b/src/event/modules/ngx_aio_module.c
@@ -19,7 +19,7 @@ static int ngx_aio_process_events(ngx_log_t *log);
ngx_os_io_t ngx_os_aio = {
ngx_aio_read,
- NULL,
+ ngx_aio_read_chain,
ngx_aio_write,
ngx_aio_write_chain,
NGX_HAVE_ZEROCOPY
diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c
index 61c724f27..6f6ed622b 100644
--- a/src/event/modules/ngx_kqueue_module.c
+++ b/src/event/modules/ngx_kqueue_module.c
@@ -437,7 +437,7 @@ static int ngx_kqueue_process_events(ngx_log_t *log)
break;
case EVFILT_AIO:
- ev->ready = 1;
+ ev->aio_complete = 1;
ev->active = 0;
ev->event_handler(ev);
diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h
index f07020725..1e5a63a79 100644
--- a/src/event/ngx_event.h
+++ b/src/event/ngx_event.h
@@ -60,13 +60,19 @@ struct ngx_event_s {
unsigned instance:1;
/*
- * event was passed or would be passed to a kernel;
- * the posted aio operation.
+ * the event was passed or would be passed to a kernel;
+ * aio mode: 1 - the posted aio operation,
+ * 0 - the complete aio operation or no aio operation.
*/
unsigned active:1;
- /* ready event; the complete aio operation */
+ /*
+ * the ready event;
+ * in aio mode "ready" is always set - it makes things simple
+ * to learn whether the aio operation complete use aio_complete flag
+ */
unsigned ready:1;
+ unsigned aio_complete:1;
unsigned eof:1;
unsigned error:1;
@@ -338,15 +344,8 @@ int ngx_event_post_acceptex(ngx_listening_t *ls, int n);
-ngx_inline static int ngx_handle_read_event(ngx_event_t *rev)
+ngx_inline static int ngx_handle_read_event(ngx_event_t *rev, int close)
{
- if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_EDGE_EVENT)) {
-
- /* aio, iocp, epoll */
-
- return NGX_OK;
- }
-
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue */
@@ -359,27 +358,32 @@ ngx_inline static int ngx_handle_read_event(ngx_event_t *rev)
}
return NGX_OK;
- }
-
- /* select, poll, /dev/poll */
- if (!rev->active && !rev->ready) {
- if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) == NGX_ERROR) {
- return NGX_ERROR;
- }
+ } else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) {
- return NGX_OK;
- }
+ /* select, poll, /dev/poll */
- if (rev->active && (rev->ready || rev->eof)) {
- if (ngx_del_event(rev, NGX_READ_EVENT, rev->eof ? NGX_CLOSE_EVENT : 0)
+ if (!rev->active && !rev->ready) {
+ if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT)
== NGX_ERROR) {
- return NGX_ERROR;
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
}
- return NGX_OK;
+ if (rev->active && (rev->ready || close)) {
+ if (ngx_del_event(rev, NGX_READ_EVENT, close ? NGX_CLOSE_EVENT : 0)
+ == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+ }
}
+ /* aio, iocp, epoll, rt signals */
+
return NGX_OK;
}
@@ -411,13 +415,6 @@ ngx_inline static int ngx_handle_level_read_event(ngx_event_t *rev)
ngx_inline static int ngx_handle_write_event(ngx_event_t *wev, int lowat)
{
- if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_EDGE_EVENT)) {
-
- /* aio, iocp, epoll */
-
- return NGX_OK;
- }
-
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue */
@@ -437,26 +434,31 @@ ngx_inline static int ngx_handle_write_event(ngx_event_t *wev, int lowat)
}
return NGX_OK;
- }
- /* select, poll, /dev/poll */
+ } else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) {
- if (!wev->active && !wev->ready) {
- if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) == NGX_ERROR) {
- return NGX_ERROR;
- }
+ /* select, poll, /dev/poll */
- return NGX_OK;
- }
+ if (!wev->active && !wev->ready) {
+ if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT)
+ == NGX_ERROR) {
+ return NGX_ERROR;
+ }
- if (wev->active && wev->ready) {
- if (ngx_del_event(wev, NGX_WRITE_EVENT, 0) == NGX_ERROR) {
- return NGX_ERROR;
+ return NGX_OK;
}
- return NGX_OK;
+ if (wev->active && wev->ready) {
+ if (ngx_del_event(wev, NGX_WRITE_EVENT, 0) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+ }
}
+ /* aio, iocp, epoll, rt signals */
+
return NGX_OK;
}
diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c
index e9469753d..a4f7436e9 100644
--- a/src/event/ngx_event_accept.c
+++ b/src/event/ngx_event_accept.c
@@ -172,9 +172,11 @@ ngx_log_debug(ev->log, "ADDR %s" _ ls->listening->addr_text.data);
c->fd = s;
c->unexpected_eof = 1;
wev->write = 1;
+ wev->ready = 1;
- if ((ngx_event_flags & NGX_USE_AIO_EVENT) == 0) {
- wev->ready = 1;
+ if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_EDGE_EVENT)) {
+ /* aio, iocp, epoll */
+ rev->ready = 1;
}
c->ctx = ls->ctx;
diff --git a/src/event/ngx_event_connect.c b/src/event/ngx_event_connect.c
index 16b440239..d47c5429b 100644
--- a/src/event/ngx_event_connect.c
+++ b/src/event/ngx_event_connect.c
@@ -202,7 +202,19 @@ ngx_log_debug(pc->log, "CONNECT: %s" _ peer->addr_port_text.data);
}
}
- /* TODO: epoll, aio, iocp */
+ if (ngx_event_flags & NGX_USE_AIO_EVENT) {
+ /* aio, iocp */
+ rev->ready = 1;
+
+#if 1
+ /* TODO: NGX_EINPROGRESS */
+
+ wev->ready = 1;
+ return NGX_OK;
+#endif
+ }
+
+ /* TODO: epoll */
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue */
event = NGX_CLEAR_EVENT;
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index 209f5cf23..d9f9abcec 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -19,6 +19,8 @@ static int ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
int ngx_event_pipe(ngx_event_pipe_t *p, int do_write)
{
+ ngx_event_t *rev, *wev;
+
for ( ;; ) {
if (do_write) {
if (ngx_event_pipe_write_to_downstream(p) == NGX_ABORT) {
@@ -40,15 +42,26 @@ int ngx_event_pipe(ngx_event_pipe_t *p, int do_write)
do_write = 1;
}
- if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) {
+ rev = p->upstream->read;
+
+ if (ngx_handle_read_event(rev, (rev->eof || rev->error)) == NGX_ERROR) {
return NGX_ABORT;
}
- if (ngx_handle_write_event(p->downstream->write,
- /* TODO: lowat */ 0) == NGX_ERROR) {
+ if (rev->active) {
+ ngx_add_timer(rev, p->read_timeout);
+ }
+
+ wev = p->downstream->write;
+
+ if (ngx_handle_write_event(wev, p->send_lowat) == NGX_ERROR) {
return NGX_ABORT;
}
+ if (wev->active) {
+ ngx_add_timer(wev, p->send_timeout);
+ }
+
return NGX_OK;
}
@@ -112,7 +125,6 @@ int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
ngx_log_error(NGX_LOG_ERR, p->log,
p->upstream->read->kq_errno,
- /* TODO: ngx_readv_chain_n */
"readv() failed");
}
@@ -213,8 +225,6 @@ int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
break;
}
- ngx_log_debug(p->log, "HUNK_FREE: %d" _ chain->hunk->num);
-
n = ngx_recv_chain(p->upstream, chain);
ngx_log_debug(p->log, "recv_chain: %d" _ n);
@@ -343,8 +353,6 @@ int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
ngx_event_pipe_free_shadow_raw_hunk(&p->free_raw_hunks,
cl->hunk);
-ngx_log_debug(p->log, "HUNK OUT: %d %x" _ cl->hunk->num _ cl->hunk->type);
-
} else if (!p->cachable && p->in) {
cl = p->in;
@@ -356,8 +364,6 @@ ngx_log_debug(p->log, "HUNK OUT: %d %x" _ cl->hunk->num _ cl->hunk->type);
p->in = p->in->next;
-ngx_log_debug(p->log, "HUNK IN: %d" _ cl->hunk->num);
-
} else {
break;
}
diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h
index f93adba08..f59b60e7d 100644
--- a/src/event/ngx_event_pipe.h
+++ b/src/event/ngx_event_pipe.h
@@ -59,6 +59,10 @@ struct ngx_event_pipe_s {
ngx_connection_t *upstream;
ngx_connection_t *downstream;
+ ngx_msec_t read_timeout;
+ ngx_msec_t send_timeout;
+ ssize_t send_lowat;
+
ngx_pool_t *pool;
ngx_log_t *log;