summaryrefslogtreecommitdiffhomepage
path: root/src/event
diff options
context:
space:
mode:
authorMaxim Dounin <mdounin@mdounin.ru>2016-03-18 06:44:49 +0300
committerMaxim Dounin <mdounin@mdounin.ru>2016-03-18 06:44:49 +0300
commit348f705c000bdbfbee74d6f0111a03697f8ffa4f (patch)
tree257c01008977e51e7dd44b0d69d1795d4bb362ce /src/event
parent10c8c8d6a47db5e84825479438ce5848b2d1dda4 (diff)
downloadnginx-348f705c000bdbfbee74d6f0111a03697f8ffa4f.tar.gz
nginx-348f705c000bdbfbee74d6f0111a03697f8ffa4f.tar.bz2
Threads: writing via threads pools in event pipe.
The "aio_write" directive is introduced, which enables use of aio for writing. Currently it is meaningful only with "aio threads". Note that aio operations can be done by both event pipe and output chain, so proper mapping between r->aio and p->aio is provided when calling ngx_event_pipe() and in output filter. In collaboration with Valentin Bartenev.
Diffstat (limited to 'src/event')
-rw-r--r--src/event/ngx_event_pipe.c98
-rw-r--r--src/event/ngx_event_pipe.h10
2 files changed, 88 insertions, 20 deletions
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index 2d0e7d35e..ee86c7e76 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -112,6 +112,14 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
return NGX_OK;
}
+#if (NGX_THREADS)
+ if (p->aio) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe read upstream: aio");
+ return NGX_AGAIN;
+ }
+#endif
+
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: %d", p->upstream->read->ready);
@@ -258,19 +266,6 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
break;
}
- if (rc == NGX_AGAIN) {
- if (ngx_event_flags & NGX_USE_LEVEL_EVENT
- && p->upstream->read->active
- && p->upstream->read->ready)
- {
- if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
- == NGX_ERROR)
- {
- return NGX_ABORT;
- }
- }
- }
-
if (rc != NGX_OK) {
return rc;
}
@@ -475,8 +470,10 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");
- if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
- return NGX_ABORT;
+ rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+ if (rc != NGX_OK) {
+ return rc;
}
}
@@ -499,6 +496,18 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);
+#if (NGX_THREADS)
+
+ if (p->writing) {
+ rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+ if (rc == NGX_ABORT) {
+ return NGX_ABORT;
+ }
+ }
+
+#endif
+
flushed = 0;
for ( ;; ) {
@@ -532,6 +541,10 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
p->out = NULL;
}
+ if (p->writing) {
+ break;
+ }
+
if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
@@ -608,7 +621,7 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
p->out = p->out->next;
- } else if (!p->cacheable && p->in) {
+ } else if (!p->cacheable && !p->writing && p->in) {
cl = p->in;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
@@ -710,12 +723,38 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
ssize_t size, bsize, n;
ngx_buf_t *b;
ngx_uint_t prev_last_shadow;
- ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
+ ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;
+
+#if (NGX_THREADS)
+
+ if (p->writing) {
+
+ if (p->aio) {
+ return NGX_AGAIN;
+ }
+
+ out = p->writing;
+ p->writing = NULL;
+
+ n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
+
+ if (n == NGX_ERROR) {
+ return NGX_ABORT;
+ }
+
+ goto done;
+ }
+
+#endif
if (p->buf_to_file) {
- fl.buf = p->buf_to_file;
- fl.next = p->in;
- out = &fl;
+ out = ngx_alloc_chain_link(p->pool);
+ if (out == NULL) {
+ return NGX_ABORT;
+ }
+
+ out->buf = p->buf_to_file;
+ out->next = p->in;
} else {
out = p->in;
@@ -775,12 +814,31 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
p->last_in = &p->in;
}
+#if (NGX_THREADS)
+ p->temp_file->thread_write = p->thread_handler ? 1 : 0;
+ p->temp_file->file.thread_task = p->thread_task;
+ p->temp_file->file.thread_handler = p->thread_handler;
+ p->temp_file->file.thread_ctx = p->thread_ctx;
+#endif
+
n = ngx_write_chain_to_temp_file(p->temp_file, out);
if (n == NGX_ERROR) {
return NGX_ABORT;
}
+#if (NGX_THREADS)
+
+ if (n == NGX_AGAIN) {
+ p->writing = out;
+ p->thread_task = p->temp_file->file.thread_task;
+ return NGX_AGAIN;
+ }
+
+done:
+
+#endif
+
if (p->buf_to_file) {
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
n -= p->buf_to_file->last - p->buf_to_file->pos;
diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h
index 451fc4c05..ef2e7a006 100644
--- a/src/event/ngx_event_pipe.h
+++ b/src/event/ngx_event_pipe.h
@@ -30,6 +30,8 @@ struct ngx_event_pipe_s {
ngx_chain_t *in;
ngx_chain_t **last_in;
+ ngx_chain_t *writing;
+
ngx_chain_t *out;
ngx_chain_t *free;
ngx_chain_t *busy;
@@ -45,6 +47,13 @@ struct ngx_event_pipe_s {
ngx_event_pipe_output_filter_pt output_filter;
void *output_ctx;
+#if (NGX_THREADS)
+ ngx_int_t (*thread_handler)(ngx_thread_task_t *task,
+ ngx_file_t *file);
+ void *thread_ctx;
+ ngx_thread_task_t *thread_task;
+#endif
+
unsigned read:1;
unsigned cacheable:1;
unsigned single_buf:1;
@@ -56,6 +65,7 @@ struct ngx_event_pipe_s {
unsigned downstream_done:1;
unsigned downstream_error:1;
unsigned cyclic_temp_file:1;
+ unsigned aio:1;
ngx_int_t allocated;
ngx_bufs_t bufs;