summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/event/ngx_event_quic.c173
-rw-r--r--src/event/ngx_event_quic.h1
2 files changed, 156 insertions, 18 deletions
diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c
index 913124b1c..0fcb02fdf 100644
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -48,8 +48,10 @@ typedef struct {
ngx_uint_t id_counter;
- uint64_t total_received;
- uint64_t max_data;
+ uint64_t received;
+ uint64_t sent;
+ uint64_t recv_max_data;
+ uint64_t send_max_data;
} ngx_quic_streams_t;
@@ -112,7 +114,6 @@ struct ngx_quic_connection_s {
ngx_quic_streams_t streams;
ngx_quic_congestion_t congestion;
- ngx_uint_t max_data;
uint64_t cur_streams;
uint64_t max_streams;
@@ -201,10 +202,14 @@ static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
ngx_quic_frame_t *frame);
static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
+static ngx_int_t ngx_quic_handle_max_data_frame(ngx_connection_t *c,
+ ngx_quic_max_data_frame_t *f);
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
+static ngx_int_t ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
+ ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f);
static void ngx_quic_queue_frame(ngx_quic_connection_t *qc,
ngx_quic_frame_t *frame);
@@ -599,7 +604,7 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_ssl_t *ssl, ngx_quic_tp_t *tp,
ctp->ack_delay_exponent = NGX_QUIC_DEFAULT_ACK_DELAY_EXPONENT;
ctp->max_ack_delay = NGX_QUIC_DEFAULT_MAX_ACK_DELAY;
- qc->streams.max_data = qc->tp.initial_max_data;
+ qc->streams.recv_max_data = qc->tp.initial_max_data;
qc->congestion.window = ngx_min(10 * qc->tp.max_packet_size,
ngx_max(2 * qc->tp.max_packet_size, 14720));
@@ -1416,7 +1421,12 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
break;
case NGX_QUIC_FT_MAX_DATA:
- c->quic->max_data = frame.u.max_data.max_data;
+
+ if (ngx_quic_handle_max_data_frame(c, &frame.u.max_data) != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
ack_this = 1;
break;
@@ -1445,6 +1455,18 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
ack_this = 1;
break;
+ case NGX_QUIC_FT_MAX_STREAM_DATA:
+
+ if (ngx_quic_handle_max_stream_data_frame(c, pkt,
+ &frame.u.max_stream_data)
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
+ ack_this = 1;
+ break;
+
case NGX_QUIC_FT_NEW_CONNECTION_ID:
case NGX_QUIC_FT_RETIRE_CONNECTION_ID:
case NGX_QUIC_FT_NEW_TOKEN:
@@ -1452,7 +1474,6 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
case NGX_QUIC_FT_STOP_SENDING:
case NGX_QUIC_FT_PATH_CHALLENGE:
case NGX_QUIC_FT_PATH_RESPONSE:
- case NGX_QUIC_FT_MAX_STREAM_DATA:
/* TODO: handle */
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
@@ -2208,6 +2229,45 @@ ngx_quic_handle_max_streams(ngx_connection_t *c)
static ngx_int_t
+ngx_quic_handle_max_data_frame(ngx_connection_t *c,
+ ngx_quic_max_data_frame_t *f)
+{
+ ngx_event_t *wev;
+ ngx_rbtree_t *tree;
+ ngx_rbtree_node_t *node;
+ ngx_quic_stream_t *qs;
+ ngx_quic_connection_t *qc;
+
+ qc = c->quic;
+ tree = &qc->streams.tree;
+
+ if (f->max_data <= qc->streams.send_max_data) {
+ return NGX_OK;
+ }
+
+ if (qc->streams.sent >= qc->streams.send_max_data) {
+
+ for (node = ngx_rbtree_min(tree->root, tree->sentinel);
+ node;
+ node = ngx_rbtree_next(tree, node))
+ {
+ qs = (ngx_quic_stream_t *) node;
+ wev = qs->c->write;
+
+ if (wev->active) {
+ wev->ready = 1;
+ ngx_post_event(wev, &ngx_posted_events);
+ }
+ }
+ }
+
+ qc->streams.send_max_data = f->max_data;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f)
{
@@ -2279,6 +2339,44 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
}
+static ngx_int_t
+ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
+ ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
+{
+ uint64_t sent;
+ ngx_event_t *wev;
+ ngx_quic_stream_t *sn;
+ ngx_quic_connection_t *qc;
+
+ qc = c->quic;
+ sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+ if (sn == NULL) {
+ ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id);
+ return NGX_ERROR;
+ }
+
+ if (f->limit <= sn->send_max_data) {
+ return NGX_OK;
+ }
+
+ sent = sn->c->sent;
+
+ if (sent >= sn->send_max_data) {
+ wev = sn->c->write;
+
+ if (wev->active) {
+ wev->ready = 1;
+ ngx_post_event(wev, &ngx_posted_events);
+ }
+ }
+
+ sn->send_max_data = f->limit;
+
+ return NGX_OK;
+}
+
+
static void
ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame)
{
@@ -2810,10 +2908,13 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtree, uint64_t id)
static ngx_quic_stream_t *
ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
{
- ngx_log_t *log;
- ngx_pool_t *pool;
- ngx_quic_stream_t *sn;
- ngx_pool_cleanup_t *cln;
+ ngx_log_t *log;
+ ngx_pool_t *pool;
+ ngx_quic_stream_t *sn;
+ ngx_pool_cleanup_t *cln;
+ ngx_quic_connection_t *qc;
+
+ qc = c->quic;
pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
if (pool == NULL) {
@@ -2877,6 +2978,19 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
sn->c->write->ready = 1;
}
+ if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
+ if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
+ sn->send_max_data = qc->ctp.initial_max_stream_data_uni;
+ }
+
+ } else {
+ if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
+ sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_remote;
+ } else {
+ sn->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
+ }
+ }
+
cln = ngx_pool_cleanup_add(pool, 0);
if (cln == NULL) {
ngx_close_connection(sn->c);
@@ -2932,7 +3046,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_memcpy(buf, b->pos, len);
b->pos += len;
- qc->streams.total_received += len;
+ qc->streams.received += len;
if (b->pos == b->last) {
b->pos = b->start;
@@ -2963,7 +3077,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_quic_queue_frame(pc->quic, frame);
}
- if ((qc->streams.max_data / 2) < qc->streams.total_received) {
+ if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
frame = ngx_quic_alloc_frame(pc, 0);
@@ -2971,11 +3085,11 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
return NGX_ERROR;
}
- qc->streams.max_data *= 2;
+ qc->streams.recv_max_data *= 2;
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_MAX_DATA;
- frame->u.max_data.max_data = qc->streams.max_data;
+ frame->u.max_data.max_data = qc->streams.recv_max_data;
ngx_sprintf(frame->info, "MAX_DATA max_data:%d level=%d on recv",
(int) frame->u.max_data.max_data, frame->level);
@@ -2984,7 +3098,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id 0x%xi recv: increased max data: %ui",
- qs->id, qc->streams.max_data);
+ qs->id, qc->streams.recv_max_data);
}
return len;
@@ -3024,6 +3138,10 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
sent = c->sent;
unacked = sent - qs->acked;
+ if (qc->streams.send_max_data == 0) {
+ qc->streams.send_max_data = qc->ctp.initial_max_data;
+ }
+
if (unacked >= NGX_QUIC_STREAM_BUFSIZE) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send hit buffer size");
@@ -3033,6 +3151,24 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
len = NGX_QUIC_STREAM_BUFSIZE - unacked;
}
+ if (qc->streams.sent >= qc->streams.send_max_data) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic send hit MAX_DATA");
+ len = 0;
+
+ } else if (qc->streams.sent + len > qc->streams.send_max_data) {
+ len = qc->streams.send_max_data - qc->streams.sent;
+ }
+
+ if (sent >= qs->send_max_data) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic send hit MAX_STREAM_DATA");
+ len = 0;
+
+ } else if (sent + len > qs->send_max_data) {
+ len = qs->send_max_data - sent;
+ }
+
p = (u_char *) buf;
end = (u_char *) buf + len;
n = 0;
@@ -3061,6 +3197,7 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
frame->u.stream.data = frame->data;
c->sent += fsize;
+ qc->streams.sent += fsize;
p += fsize;
n += fsize;
@@ -3070,9 +3207,9 @@ ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size)
ngx_quic_queue_frame(qc, frame);
}
- ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic stream send %uz sent:%O, unacked:%uL",
- n, c->sent, (uint64_t) c->sent - qs->acked);
+ ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic send %uz of %uz, sent:%O, unacked:%uL",
+ n, size, c->sent, (uint64_t) c->sent - qs->acked);
if (n != size) {
c->write->ready = 0;
diff --git a/src/event/ngx_event_quic.h b/src/event/ngx_event_quic.h
index 051550de6..4482c8b7d 100644
--- a/src/event/ngx_event_quic.h
+++ b/src/event/ngx_event_quic.h
@@ -71,6 +71,7 @@ struct ngx_quic_stream_s {
ngx_connection_t *c;
uint64_t id;
uint64_t acked;
+ uint64_t send_max_data;
ngx_buf_t *b;
ngx_quic_frames_stream_t fs;
};