summaryrefslogtreecommitdiffhomepage
path: root/src/event/ngx_event_pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/ngx_event_pipe.c')
-rw-r--r--src/event/ngx_event_pipe.c41
1 files changed, 36 insertions, 5 deletions
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index 21f084417..62663d5a4 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -66,11 +66,13 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
return NGX_ABORT;
}
- if (rev->active && !rev->ready) {
- ngx_add_timer(rev, p->read_timeout);
+ if (!rev->delayed) {
+ if (rev->active && !rev->ready) {
+ ngx_add_timer(rev, p->read_timeout);
- } else if (rev->timer_set) {
- ngx_del_timer(rev);
+ } else if (rev->timer_set) {
+ ngx_del_timer(rev);
+ }
}
}
@@ -99,9 +101,11 @@ ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{
+ off_t limit;
ssize_t n, size;
ngx_int_t rc;
ngx_buf_t *b;
+ ngx_msec_t delay;
ngx_chain_t *chain, *cl, *ln;
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
@@ -169,6 +173,25 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
}
#endif
+ if (p->limit_rate) {
+ if (p->upstream->read->delayed) {
+ break;
+ }
+
+ limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
+ - p->read_length;
+
+ if (limit <= 0) {
+ p->upstream->read->delayed = 1;
+ delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
+ ngx_add_timer(p->upstream->read, delay);
+ break;
+ }
+
+ } else {
+ limit = 0;
+ }
+
if (p->free_raw_bufs) {
/* use the free bufs if they exist */
@@ -270,7 +293,7 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
break;
}
- n = p->upstream->recv_chain(p->upstream, chain, 0);
+ n = p->upstream->recv_chain(p->upstream, chain, limit);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe recv chain: %z", n);
@@ -301,6 +324,8 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
}
}
+ delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
+
p->read_length += n;
cl = chain;
p->free_raw_bufs = NULL;
@@ -337,6 +362,12 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
ln->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
}
+
+ if (delay > 0) {
+ p->upstream->read->delayed = 1;
+ ngx_add_timer(p->upstream->read, delay);
+ break;
+ }
}
#if (NGX_DEBUG)