diff options
Diffstat (limited to 'src/http')
| -rw-r--r-- | src/http/ngx_http_core_module.c | 9 | ||||
| -rw-r--r-- | src/http/ngx_http_core_module.h | 1 | ||||
| -rw-r--r-- | src/http/ngx_http_upstream.c | 117 |
3 files changed, 126 insertions, 1 deletions
diff --git a/src/http/ngx_http_core_module.c b/src/http/ngx_http_core_module.c index 7b70a3f1f..1ce1e23ea 100644 --- a/src/http/ngx_http_core_module.c +++ b/src/http/ngx_http_core_module.c @@ -402,6 +402,13 @@ static ngx_command_t ngx_http_core_commands[] = { 0, NULL }, + { ngx_string("aio_write"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_core_loc_conf_t, aio_write), + NULL }, + { ngx_string("read_ahead"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, ngx_conf_set_size_slot, @@ -3608,6 +3615,7 @@ ngx_http_core_create_loc_conf(ngx_conf_t *cf) clcf->sendfile = NGX_CONF_UNSET; clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE; clcf->aio = NGX_CONF_UNSET; + clcf->aio_write = NGX_CONF_UNSET; #if (NGX_THREADS) clcf->thread_pool = NGX_CONF_UNSET_PTR; clcf->thread_pool_value = NGX_CONF_UNSET_PTR; @@ -3829,6 +3837,7 @@ ngx_http_core_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) prev->sendfile_max_chunk, 0); #if (NGX_HAVE_FILE_AIO || NGX_THREADS) ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF); + ngx_conf_merge_value(conf->aio_write, prev->aio_write, 0); #endif #if (NGX_THREADS) ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL); diff --git a/src/http/ngx_http_core_module.h b/src/http/ngx_http_core_module.h index dd434e4fa..961de3f9a 100644 --- a/src/http/ngx_http_core_module.h +++ b/src/http/ngx_http_core_module.h @@ -404,6 +404,7 @@ struct ngx_http_core_loc_conf_s { ngx_flag_t internal; /* internal */ ngx_flag_t sendfile; /* sendfile */ ngx_flag_t aio; /* aio */ + ngx_flag_t aio_write; /* aio_write */ ngx_flag_t tcp_nopush; /* tcp_nopush */ ngx_flag_t tcp_nodelay; /* tcp_nodelay */ ngx_flag_t reset_timedout_connection; /* reset_timedout_connection */ diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c index 4df485a99..67bd38333 100644 --- a/src/http/ngx_http_upstream.c +++ b/src/http/ngx_http_upstream.c @@ -76,6 +76,13 @@ static void static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data); static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes); +#if (NGX_THREADS) +static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task, + ngx_file_t *file); +static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev); +#endif +static ngx_int_t ngx_http_upstream_output_filter(void *data, + ngx_chain_t *chain); static void ngx_http_upstream_process_downstream(ngx_http_request_t *r); static void ngx_http_upstream_process_upstream(ngx_http_request_t *r, ngx_http_upstream_t *u); @@ -2870,7 +2877,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) p = u->pipe; - p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; + p->output_filter = ngx_http_upstream_output_filter; p->output_ctx = r; p->tag = u->output.tag; p->bufs = u->conf->bufs; @@ -2913,6 +2920,13 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) p->max_temp_file_size = u->conf->max_temp_file_size; p->temp_file_write_size = u->conf->temp_file_write_size; +#if (NGX_THREADS) + if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) { + p->thread_handler = ngx_http_upstream_thread_handler; + p->thread_ctx = r; + } +#endif + p->preread_bufs = ngx_alloc_chain_link(r->pool); if (p->preread_bufs == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); @@ -3487,6 +3501,97 @@ ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes) } +#if (NGX_THREADS) + +static ngx_int_t +ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file) +{ + ngx_str_t name; + ngx_event_pipe_t *p; + ngx_thread_pool_t *tp; + ngx_http_request_t *r; + ngx_http_core_loc_conf_t *clcf; + + r = file->thread_ctx; + p = r->upstream->pipe; + + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + tp = clcf->thread_pool; + + if (tp == NULL) { + if (ngx_http_complex_value(r, clcf->thread_pool_value, &name) + != NGX_OK) + { + return NGX_ERROR; + } + + tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name); + + if (tp == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "thread pool \"%V\" not found", &name); + return NGX_ERROR; + } + } + + task->event.data = r; + task->event.handler = ngx_http_upstream_thread_event_handler; + + if (ngx_thread_task_post(tp, task) != NGX_OK) { + return NGX_ERROR; + } + + r->main->blocked++; + r->aio = 1; + p->aio = 1; + + return NGX_OK; +} + + +static void +ngx_http_upstream_thread_event_handler(ngx_event_t *ev) +{ + ngx_connection_t *c; + ngx_http_request_t *r; + + r = ev->data; + c = r->connection; + + ngx_http_set_log_request(c->log, r); + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, + "http upstream thread: \"%V?%V\"", &r->uri, &r->args); + + r->main->blocked--; + r->aio = 0; + + r->write_event_handler(r); + + ngx_http_run_posted_requests(c); +} + +#endif + + +static ngx_int_t +ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain) +{ + ngx_int_t rc; + ngx_event_pipe_t *p; + ngx_http_request_t *r; + + r = data; + p = r->upstream->pipe; + + rc = ngx_http_output_filter(r, chain); + + p->aio = r->aio; + + return rc; +} + + static void ngx_http_upstream_process_downstream(ngx_http_request_t *r) { @@ -3505,6 +3610,10 @@ ngx_http_upstream_process_downstream(ngx_http_request_t *r) c->log->action = "sending to client"; +#if (NGX_THREADS) + p->aio = r->aio; +#endif + if (wev->timedout) { if (wev->delayed) { @@ -3634,6 +3743,12 @@ ngx_http_upstream_process_request(ngx_http_request_t *r, p = u->pipe; +#if (NGX_THREADS) + if (p->writing) { + return; + } +#endif + if (u->peer.connection) { if (u->store) { |
