summaryrefslogtreecommitdiffhomepage
path: root/src/event/ngx_event_pipe.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/event/ngx_event_pipe.c98
1 files changed, 78 insertions, 20 deletions
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index 2d0e7d35e..ee86c7e76 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -112,6 +112,14 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
return NGX_OK;
}
+#if (NGX_THREADS)
+ if (p->aio) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe read upstream: aio");
+ return NGX_AGAIN;
+ }
+#endif
+
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: %d", p->upstream->read->ready);
@@ -258,19 +266,6 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
break;
}
- if (rc == NGX_AGAIN) {
- if (ngx_event_flags & NGX_USE_LEVEL_EVENT
- && p->upstream->read->active
- && p->upstream->read->ready)
- {
- if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
- == NGX_ERROR)
- {
- return NGX_ABORT;
- }
- }
- }
-
if (rc != NGX_OK) {
return rc;
}
@@ -475,8 +470,10 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");
- if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
- return NGX_ABORT;
+ rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+ if (rc != NGX_OK) {
+ return rc;
}
}
@@ -499,6 +496,18 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);
+#if (NGX_THREADS)
+
+ if (p->writing) {
+ rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+ if (rc == NGX_ABORT) {
+ return NGX_ABORT;
+ }
+ }
+
+#endif
+
flushed = 0;
for ( ;; ) {
@@ -532,6 +541,10 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
p->out = NULL;
}
+ if (p->writing) {
+ break;
+ }
+
if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
@@ -608,7 +621,7 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
p->out = p->out->next;
- } else if (!p->cacheable && p->in) {
+ } else if (!p->cacheable && !p->writing && p->in) {
cl = p->in;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
@@ -710,12 +723,38 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
ssize_t size, bsize, n;
ngx_buf_t *b;
ngx_uint_t prev_last_shadow;
- ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
+ ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;
+
+#if (NGX_THREADS)
+
+ if (p->writing) {
+
+ if (p->aio) {
+ return NGX_AGAIN;
+ }
+
+ out = p->writing;
+ p->writing = NULL;
+
+ n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
+
+ if (n == NGX_ERROR) {
+ return NGX_ABORT;
+ }
+
+ goto done;
+ }
+
+#endif
if (p->buf_to_file) {
- fl.buf = p->buf_to_file;
- fl.next = p->in;
- out = &fl;
+ out = ngx_alloc_chain_link(p->pool);
+ if (out == NULL) {
+ return NGX_ABORT;
+ }
+
+ out->buf = p->buf_to_file;
+ out->next = p->in;
} else {
out = p->in;
@@ -775,12 +814,31 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
p->last_in = &p->in;
}
+#if (NGX_THREADS)
+ p->temp_file->thread_write = p->thread_handler ? 1 : 0;
+ p->temp_file->file.thread_task = p->thread_task;
+ p->temp_file->file.thread_handler = p->thread_handler;
+ p->temp_file->file.thread_ctx = p->thread_ctx;
+#endif
+
n = ngx_write_chain_to_temp_file(p->temp_file, out);
if (n == NGX_ERROR) {
return NGX_ABORT;
}
+#if (NGX_THREADS)
+
+ if (n == NGX_AGAIN) {
+ p->writing = out;
+ p->thread_task = p->temp_file->file.thread_task;
+ return NGX_AGAIN;
+ }
+
+done:
+
+#endif
+
if (p->buf_to_file) {
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
n -= p->buf_to_file->last - p->buf_to_file->pos;