diff options
| author | Igor Sysoev <igor@sysoev.ru> | 2003-10-21 16:49:56 +0000 |
|---|---|---|
| committer | Igor Sysoev <igor@sysoev.ru> | 2003-10-21 16:49:56 +0000 |
| commit | 419f9aceb4d994c2f7f51400f59fb2da0ed666d4 (patch) | |
| tree | e32162d1d3833491fc1a95780880099f2ef6857f /src/event/ngx_event_pipe.c | |
| parent | 9760a1336f0eb4057b6e2ccdd4b0145087102b17 (diff) | |
| download | nginx-419f9aceb4d994c2f7f51400f59fb2da0ed666d4.tar.gz nginx-419f9aceb4d994c2f7f51400f59fb2da0ed666d4.tar.bz2 | |
nginx-0.0.1-2003-10-21-20:49:56 import
Diffstat (limited to 'src/event/ngx_event_pipe.c')
| -rw-r--r-- | src/event/ngx_event_pipe.c | 606 |
1 files changed, 606 insertions, 0 deletions
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c new file mode 100644 index 000000000..fdaba3f7d --- /dev/null +++ b/src/event/ngx_event_pipe.c @@ -0,0 +1,606 @@ + +#include <ngx_config.h> +#include <ngx_core.h> +#include <ngx_event.h> +#include <ngx_event_pipe.h> + + +static int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p); +static int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p); + +static int ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p); +ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk); +ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free, + ngx_hunk_t *h); +ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain, + ngx_chain_t *ce); +static int ngx_drain_chains(ngx_event_pipe_t *p); + + +int ngx_event_pipe(ngx_event_pipe_t *p, int do_write) +{ + for ( ;; ) { + if (do_write) { + if (ngx_event_pipe_write_to_downstream(p) == NGX_ABORT) { + return NGX_ABORT; + } + } + + p->read = 0; + + if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { + return NGX_ABORT; + } + + if (!p->read) { + break; + } + + do_write = 1; + } + + if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) { + return NGX_ABORT; + } + + if (ngx_handle_write_event(p->downstream->write, + /* TODO: lowat */ 0) == NGX_ERROR) { + return NGX_ABORT; + } + + return NGX_OK; +} + + +int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) +{ + int n, rc, size; + ngx_hunk_t *h; + ngx_chain_t *chain, *ce, *te; + + if (p->upstream_eof || p->upstream_error || p->upstream_done) { + return NGX_OK; + } + + ngx_log_debug(p->log, "read upstream: %d" _ p->upstream->read->ready); + + for ( ;; ) { + + if (p->upstream_eof || p->upstream_error || p->upstream_done) { + break; + } + + if (p->preread_hunks == NULL && !p->upstream->read->ready) { + break; + } + + if (p->preread_hunks) { + + /* use the pre-read hunks if they exist */ + + p->read = 1; + chain = p->preread_hunks; + p->preread_hunks = NULL; + n = p->preread_size; + + ngx_log_debug(p->log, "preread: %d" _ n); + + } else { + +#if (HAVE_KQUEUE) + + /* + * kqueue notifies about the end of file or a pending error. + * This test allows not to allocate a hunk on these conditions + * and not to call ngx_recv_chain(). + */ + + if (ngx_event_flags == NGX_HAVE_KQUEUE_EVENT) { + + if (p->upstream->read->error) { + ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->error, + "readv() failed"); + p->upstream_error = 1; + + break; + + } else if (p->upstream->read->eof + && p->upstream->read->available == 0) { + p->upstream_eof = 1; + p->read = 1; + + break; + } + } +#endif + + if (p->free_raw_hunks) { + + /* use the free hunks if they exist */ + + chain = p->free_raw_hunks; + p->free_raw_hunks = NULL; + + } else if (p->hunks < p->bufs.num) { + + /* allocate a new hunk if it's still allowed */ + + ngx_test_null(h, ngx_create_temp_hunk(p->pool, + p->bufs.size, 0, 0), + NGX_ABORT); + p->hunks++; + + ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); + chain = te; + + } else if (!p->cachable && p->downstream->write->ready) { + + /* + * if the hunks are not needed to be saved in a cache and + * a downstream is ready then write the hunks to a downstream + */ + + ngx_log_debug(p->log, "downstream ready"); + + break; + + } else if (p->cachable || p->temp_offset < p->max_temp_file_size) { + + /* + * if it's allowed then save some hunks from r->in + * to a temporary file, and add them to a r->out chain + */ + + rc = ngx_event_pipe_write_chain_to_temp_file(p); + + ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset); + + if (rc == NGX_AGAIN) { + if (ngx_event_flags & NGX_USE_LEVEL_EVENT + && p->upstream->read->active + && p->upstream->read->ready) + { + if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0) + == NGX_ERROR) + { + return NGX_ABORT; + } + } + } + + if (rc != NGX_OK) { + return rc; + } + + chain = p->free_raw_hunks; + p->free_raw_hunks = NULL; + + } else { + + /* if there're no hunks to read in then disable a level event */ + + ngx_log_debug(p->log, "no hunks to read in"); + + break; + } + + n = ngx_recv_chain(p->upstream, chain); + + ngx_log_debug(p->log, "recv_chain: %d" _ n); + + p->free_raw_hunks = chain; + + if (n == NGX_ERROR) { + p->upstream_error = 1; + return NGX_ERROR; + } + + if (n == NGX_AGAIN) { + break; + } + + p->read = 1; + + if (n == 0) { + p->upstream_eof = 1; + break; + } + } + + ce = chain; + + while (ce && n > 0) { + + ngx_remove_shadow_links(ce->hunk); + + size = ce->hunk->end - ce->hunk->last; + + if (n >= size) { + ce->hunk->last = ce->hunk->end; + + if (p->input_filter(p, ce->hunk) == NGX_ERROR) { + return NGX_ABORT; + } + + n -= size; + ce = ce->next; + + } else { + ce->hunk->last += n; + n = 0; + } + } + + p->free_raw_hunks = ce; + } + + if ((p->upstream_eof || p->upstream_error) && p->free_raw_hunks) { + if (p->input_filter(p, p->free_raw_hunks->hunk) == NGX_ERROR) { + return NGX_ABORT; + } + + /* TODO: p->free_raw_hunk->next can be free()ed */ + p->free_raw_hunks = p->free_raw_hunks->next; + } + + if (p->cachable && p->in) { + if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) { + return NGX_ABORT; + } + } + + return NGX_OK; +} + + +int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p) +{ + size_t busy_len; + ngx_hunk_t *h; + ngx_chain_t *out, **le, *ce, *te; + + ngx_log_debug(p->log, "write downstream: %d" _ p->downstream->write->ready); + + for ( ;; ) { + if (p->downstream_error) { + return ngx_drain_chains(p); + } + + if ((p->upstream_eof || p->upstream_error || p->upstream_done) + && p->out == NULL && p->in == NULL) + { + p->downstream_done = 1; + break; + } + + if (!p->downstream->write->ready) { + break; + } + + busy_len = 0; + + if (!(p->upstream_eof || p->upstream_error || p->upstream_done)) { + /* calculate p->busy_len */ + for (ce = p->busy; ce; ce = ce->next) { + busy_len += ngx_hunk_size(ce->hunk); + } + } + + out = NULL; + le = NULL; + + for ( ;; ) { + if (p->out) { + ce = p->out; + + if (!(p->upstream_eof || p->upstream_error || p->upstream_done) + && (busy_len + ngx_hunk_size(ce->hunk) > p->max_busy_len)) + { + break; + } + + p->out = p->out->next; + ngx_remove_shadow_free_raw_hunk(&p->free_raw_hunks, ce->hunk); + + } else if (!p->cachable && p->in) { + ce = p->in; + + if (!(p->upstream_eof || p->upstream_error || p->upstream_done) + && (busy_len + ngx_hunk_size(ce->hunk) > p->max_busy_len)) + { + break; + } + + p->in = p->in->next; + + } else { + break; + } + + busy_len += ngx_hunk_size(ce->hunk); + ce->next = NULL; + ngx_chain_add_ce(out, le, ce); + } + + if (out == NULL) { + break; + } + + if (p->output_filter(p->output_ctx, out) == NGX_ERROR) { + p->downstream_error = 1; + continue; + } + + ngx_chain_update_chains(&p->free, &p->busy, &out); + + /* add the free shadow raw hunks to p->free_raw_hunks */ + + for (ce = p->free; ce; ce = ce->next) { + if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) { + h = ce->hunk->shadow; + /* THINK NEEDED ??? */ h->pos = h->last = h->start; + h->shadow = NULL; + ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); + ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te); + + ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW; + } + ce->hunk->shadow = NULL; + } + } + + return NGX_OK; +} + + +static int ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p) +{ + int rc, size, hunk_size; + ngx_hunk_t *h; + ngx_chain_t *ce, *te, *next, *out, **le, **last_free; + + ngx_log_debug(p->log, "write to file"); + + if (p->temp_file->fd == NGX_INVALID_FILE) { + rc = ngx_create_temp_file(p->temp_file, p->temp_path, p->pool, + p->cachable); + + if (rc == NGX_ERROR) { + return NGX_ABORT; + } + + if (rc == NGX_AGAIN) { + return NGX_AGAIN; + } + + if (!p->cachable && p->temp_file_warn) { + ngx_log_error(NGX_LOG_WARN, p->log, 0, p->temp_file_warn); + } + } + + out = p->in; + + if (!p->cachable) { + + size = 0; + ce = p->in; + le = NULL; + +ngx_log_debug(p->log, "offset: %d" _ p->temp_offset); + + do { + hunk_size = ce->hunk->last - ce->hunk->pos; + +ngx_log_debug(p->log, "hunk size: %d" _ hunk_size); + + if ((size + hunk_size > p->temp_file_write_size) + || (p->temp_offset + hunk_size > p->max_temp_file_size)) + { + break; + } + + size += hunk_size; + le = &ce->next; + ce = ce->next; + + } while (ce); + +ngx_log_debug(p->log, "size: %d" _ size); + + if (ce) { + p->in = ce; + *le = NULL; + + } else { + p->in = NULL; + p->last_in = &p->in; + } + + } else { + p->in = NULL; + p->last_in = &p->in; + } + + if (ngx_write_chain_to_file(p->temp_file, out, p->temp_offset, + p->pool) == NGX_ERROR) { + return NGX_ABORT; + } + + for (last_free = &p->free_raw_hunks; + *last_free != NULL; + last_free = &(*last_free)->next) + { + /* void */ + } + + for (ce = out; ce; ce = next) { + next = ce->next; + ce->next = NULL; + + h = ce->hunk; + h->type |= NGX_HUNK_FILE; + h->file = p->temp_file; + h->file_pos = p->temp_offset; + p->temp_offset += h->last - h->pos; + h->file_last = p->temp_offset; + + ngx_chain_add_ce(p->out, p->last_out, ce); + + if (h->type & NGX_HUNK_LAST_SHADOW) { + h->shadow->last = h->shadow->pos = h->shadow->start; + ngx_alloc_ce_and_set_hunk(te, h->shadow, p->pool, NGX_ABORT); + *last_free = te; + last_free = &te->next; + } + } + + return NGX_OK; +} + + +/* the copy input filter */ + +int ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_hunk_t *hunk) +{ + ngx_hunk_t *h; + ngx_chain_t *ce; + + if (hunk->pos == hunk->last) { + return NGX_OK; + } + + if (p->free) { + h = p->free->hunk; + p->free = p->free->next; + + } else { + ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR); + } + + ngx_memcpy(h, hunk, sizeof(ngx_hunk_t)); + h->shadow = hunk; + h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED; + hunk->shadow = h; + + ngx_alloc_ce_and_set_hunk(ce, h, p->pool, NGX_ERROR); + ngx_chain_add_ce(p->in, p->last_in, ce); + + return NGX_OK; +} + + +ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk) +{ + ngx_hunk_t *h, *next; + + if (hunk->shadow == NULL) { + return; + } + + h = hunk->shadow; + + while (!(h->type & NGX_HUNK_LAST_SHADOW)) { + next = h->shadow; + h->type &= ~(NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED); + h->shadow = NULL; + h = next; + } + + h->type &= ~(NGX_HUNK_TEMP + |NGX_HUNK_IN_MEMORY + |NGX_HUNK_RECYCLED + |NGX_HUNK_LAST_SHADOW); + h->shadow = NULL; + + hunk->shadow = NULL; +} + + +ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free, + ngx_hunk_t *h) +{ + ngx_hunk_t *s; + ngx_chain_t *ce, **le; + + if (h->shadow == NULL) { + return; + } + + for (s = h->shadow; !(s->type & NGX_HUNK_LAST_SHADOW); s = s->shadow) { + /* void */ + } + + le = free; + + for (ce = *free ; ce; ce = ce->next) { + if (ce->hunk == s) { + *le = ce->next; + break; + } + + if (ce->hunk->shadow) { + break; + } + + le = &ce->next; + } +} + + +ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain, + ngx_chain_t *ce) +{ + if (*chain == NULL) { + *chain = ce; + return; + } + + if ((*chain)->hunk->pos != (*chain)->hunk->last) { + ce->next = (*chain)->next; + (*chain)->next = ce; + + } else { + ce->next = (*chain); + (*chain) = ce; + } +} + + +static int ngx_drain_chains(ngx_event_pipe_t *p) +{ + ngx_hunk_t *h; + ngx_chain_t *ce, *te; + + for ( ;; ) { + if (p->busy) { + ce = p->busy; + + } else if (p->out) { + ce = p->out; + + } else if (p->in) { + ce = p->in; + + } else { + return NGX_OK; + } + + while (ce) { + if (ce->hunk->type & NGX_HUNK_LAST_SHADOW) { + h = ce->hunk->shadow; + /* THINK NEEDED ??? */ h->pos = h->last = h->start; + h->shadow = NULL; + ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT); + ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te); + + ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW; + } + + ce->hunk->shadow = NULL; + te = ce->next; + ce->next = p->free; + p->free = ce; + ce = te; + } + } +} |
