summaryrefslogtreecommitdiffhomepage
path: root/src/http
diff options
context:
space:
mode:
authorMaxim Dounin <mdounin@mdounin.ru>2016-03-18 06:44:49 +0300
committerMaxim Dounin <mdounin@mdounin.ru>2016-03-18 06:44:49 +0300
commit348f705c000bdbfbee74d6f0111a03697f8ffa4f (patch)
tree257c01008977e51e7dd44b0d69d1795d4bb362ce /src/http
parent10c8c8d6a47db5e84825479438ce5848b2d1dda4 (diff)
downloadnginx-348f705c000bdbfbee74d6f0111a03697f8ffa4f.tar.gz
nginx-348f705c000bdbfbee74d6f0111a03697f8ffa4f.tar.bz2
Threads: writing via threads pools in event pipe.
The "aio_write" directive is introduced, which enables use of aio for writing. Currently it is meaningful only with "aio threads". Note that aio operations can be done by both event pipe and output chain, so proper mapping between r->aio and p->aio is provided when calling ngx_event_pipe() and in output filter. In collaboration with Valentin Bartenev.
Diffstat (limited to 'src/http')
-rw-r--r--src/http/ngx_http_core_module.c9
-rw-r--r--src/http/ngx_http_core_module.h1
-rw-r--r--src/http/ngx_http_upstream.c117
3 files changed, 126 insertions, 1 deletions
diff --git a/src/http/ngx_http_core_module.c b/src/http/ngx_http_core_module.c
index 7b70a3f1f..1ce1e23ea 100644
--- a/src/http/ngx_http_core_module.c
+++ b/src/http/ngx_http_core_module.c
@@ -402,6 +402,13 @@ static ngx_command_t ngx_http_core_commands[] = {
0,
NULL },
+ { ngx_string("aio_write"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_core_loc_conf_t, aio_write),
+ NULL },
+
{ ngx_string("read_ahead"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
@@ -3608,6 +3615,7 @@ ngx_http_core_create_loc_conf(ngx_conf_t *cf)
clcf->sendfile = NGX_CONF_UNSET;
clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE;
clcf->aio = NGX_CONF_UNSET;
+ clcf->aio_write = NGX_CONF_UNSET;
#if (NGX_THREADS)
clcf->thread_pool = NGX_CONF_UNSET_PTR;
clcf->thread_pool_value = NGX_CONF_UNSET_PTR;
@@ -3829,6 +3837,7 @@ ngx_http_core_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
prev->sendfile_max_chunk, 0);
#if (NGX_HAVE_FILE_AIO || NGX_THREADS)
ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF);
+ ngx_conf_merge_value(conf->aio_write, prev->aio_write, 0);
#endif
#if (NGX_THREADS)
ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL);
diff --git a/src/http/ngx_http_core_module.h b/src/http/ngx_http_core_module.h
index dd434e4fa..961de3f9a 100644
--- a/src/http/ngx_http_core_module.h
+++ b/src/http/ngx_http_core_module.h
@@ -404,6 +404,7 @@ struct ngx_http_core_loc_conf_s {
ngx_flag_t internal; /* internal */
ngx_flag_t sendfile; /* sendfile */
ngx_flag_t aio; /* aio */
+ ngx_flag_t aio_write; /* aio_write */
ngx_flag_t tcp_nopush; /* tcp_nopush */
ngx_flag_t tcp_nodelay; /* tcp_nodelay */
ngx_flag_t reset_timedout_connection; /* reset_timedout_connection */
diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c
index 4df485a99..67bd38333 100644
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -76,6 +76,13 @@ static void
static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
ssize_t bytes);
+#if (NGX_THREADS)
+static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task,
+ ngx_file_t *file);
+static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev);
+#endif
+static ngx_int_t ngx_http_upstream_output_filter(void *data,
+ ngx_chain_t *chain);
static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
static void ngx_http_upstream_process_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u);
@@ -2870,7 +2877,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
p = u->pipe;
- p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
+ p->output_filter = ngx_http_upstream_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs;
@@ -2913,6 +2920,13 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
p->max_temp_file_size = u->conf->max_temp_file_size;
p->temp_file_write_size = u->conf->temp_file_write_size;
+#if (NGX_THREADS)
+ if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
+ p->thread_handler = ngx_http_upstream_thread_handler;
+ p->thread_ctx = r;
+ }
+#endif
+
p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
@@ -3487,6 +3501,97 @@ ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
}
+#if (NGX_THREADS)
+
+static ngx_int_t
+ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
+{
+ ngx_str_t name;
+ ngx_event_pipe_t *p;
+ ngx_thread_pool_t *tp;
+ ngx_http_request_t *r;
+ ngx_http_core_loc_conf_t *clcf;
+
+ r = file->thread_ctx;
+ p = r->upstream->pipe;
+
+ clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+ tp = clcf->thread_pool;
+
+ if (tp == NULL) {
+ if (ngx_http_complex_value(r, clcf->thread_pool_value, &name)
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
+ tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name);
+
+ if (tp == NULL) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "thread pool \"%V\" not found", &name);
+ return NGX_ERROR;
+ }
+ }
+
+ task->event.data = r;
+ task->event.handler = ngx_http_upstream_thread_event_handler;
+
+ if (ngx_thread_task_post(tp, task) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ r->main->blocked++;
+ r->aio = 1;
+ p->aio = 1;
+
+ return NGX_OK;
+}
+
+
+static void
+ngx_http_upstream_thread_event_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_http_request_t *r;
+
+ r = ev->data;
+ c = r->connection;
+
+ ngx_http_set_log_request(c->log, r);
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
+ "http upstream thread: \"%V?%V\"", &r->uri, &r->args);
+
+ r->main->blocked--;
+ r->aio = 0;
+
+ r->write_event_handler(r);
+
+ ngx_http_run_posted_requests(c);
+}
+
+#endif
+
+
+static ngx_int_t
+ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
+{
+ ngx_int_t rc;
+ ngx_event_pipe_t *p;
+ ngx_http_request_t *r;
+
+ r = data;
+ p = r->upstream->pipe;
+
+ rc = ngx_http_output_filter(r, chain);
+
+ p->aio = r->aio;
+
+ return rc;
+}
+
+
static void
ngx_http_upstream_process_downstream(ngx_http_request_t *r)
{
@@ -3505,6 +3610,10 @@ ngx_http_upstream_process_downstream(ngx_http_request_t *r)
c->log->action = "sending to client";
+#if (NGX_THREADS)
+ p->aio = r->aio;
+#endif
+
if (wev->timedout) {
if (wev->delayed) {
@@ -3634,6 +3743,12 @@ ngx_http_upstream_process_request(ngx_http_request_t *r,
p = u->pipe;
+#if (NGX_THREADS)
+ if (p->writing) {
+ return;
+ }
+#endif
+
if (u->peer.connection) {
if (u->store) {