diff options
Diffstat (limited to 'src/event/ngx_event_pipe.c')
| -rw-r--r-- | src/event/ngx_event_pipe.c | 41 |
1 files changed, 36 insertions, 5 deletions
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c index 21f084417..62663d5a4 100644 --- a/src/event/ngx_event_pipe.c +++ b/src/event/ngx_event_pipe.c @@ -66,11 +66,13 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) return NGX_ABORT; } - if (rev->active && !rev->ready) { - ngx_add_timer(rev, p->read_timeout); + if (!rev->delayed) { + if (rev->active && !rev->ready) { + ngx_add_timer(rev, p->read_timeout); - } else if (rev->timer_set) { - ngx_del_timer(rev); + } else if (rev->timer_set) { + ngx_del_timer(rev); + } } } @@ -99,9 +101,11 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) { + off_t limit; ssize_t n, size; ngx_int_t rc; ngx_buf_t *b; + ngx_msec_t delay; ngx_chain_t *chain, *cl, *ln; if (p->upstream_eof || p->upstream_error || p->upstream_done) { @@ -169,6 +173,25 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) } #endif + if (p->limit_rate) { + if (p->upstream->read->delayed) { + break; + } + + limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1) + - p->read_length; + + if (limit <= 0) { + p->upstream->read->delayed = 1; + delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1); + ngx_add_timer(p->upstream->read, delay); + break; + } + + } else { + limit = 0; + } + if (p->free_raw_bufs) { /* use the free bufs if they exist */ @@ -270,7 +293,7 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) break; } - n = p->upstream->recv_chain(p->upstream, chain, 0); + n = p->upstream->recv_chain(p->upstream, chain, limit); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe recv chain: %z", n); @@ -301,6 +324,8 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) } } + delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0; + p->read_length += n; cl = chain; p->free_raw_bufs = NULL; @@ -337,6 +362,12 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) ln->next = p->free_raw_bufs; p->free_raw_bufs = cl; } + + if (delay > 0) { + p->upstream->read->delayed = 1; + ngx_add_timer(p->upstream->read, delay); + break; + } } #if (NGX_DEBUG) |
