summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorVladimir Homutov <vl@nginx.com>2020-04-15 11:11:54 +0300
committerVladimir Homutov <vl@nginx.com>2020-04-15 11:11:54 +0300
commit9542c975b7786a9f799093963720022ea88c74f1 (patch)
treecda0a417b6390e8b5bd8ae94595dd712d195ec31 /src
parent081682cd3ce97323f1d7aeb1c0db2044fc245c88 (diff)
downloadnginx-9542c975b7786a9f799093963720022ea88c74f1.tar.gz
nginx-9542c975b7786a9f799093963720022ea88c74f1.tar.bz2
Added reordering support for STREAM frames.
Each stream node now includes incoming frames queue and sent/received counters for tracking offset. The sent counter is not used, c->sent is used, not like in crypto buffers, which have no connections.
Diffstat (limited to 'src')
-rw-r--r--src/event/ngx_event_quic.c180
-rw-r--r--src/event/ngx_event_quic.h51
2 files changed, 159 insertions, 72 deletions
diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c
index b76a09f67..aabbb6114 100644
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -70,15 +70,6 @@ typedef struct {
} ngx_quic_send_ctx_t;
-/* ordered frames stream context */
-typedef struct {
- uint64_t sent;
- uint64_t received;
- ngx_queue_t frames;
- size_t total; /* size of buffered data */
-} ngx_quic_frames_stream_t;
-
-
struct ngx_quic_connection_s {
ngx_str_t scid;
ngx_str_t dcid;
@@ -177,7 +168,12 @@ static ngx_int_t ngx_quic_handle_ordered_frame(ngx_connection_t *c,
static ngx_int_t ngx_quic_crypto_input(ngx_connection_t *c,
ngx_quic_frame_t *frame);
static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
- ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame);
+ ngx_quic_header_t *pkt, ngx_quic_frame_t *frame);
+static ngx_int_t ngx_quic_stream_input(ngx_connection_t *c,
+ ngx_quic_frame_t *frame);
+static ngx_quic_stream_t *ngx_quic_add_stream(ngx_connection_t *c,
+ ngx_quic_stream_frame_t *f);
+
static ngx_int_t ngx_quic_handle_max_streams(ngx_connection_t *c);
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
@@ -739,6 +735,7 @@ ngx_quic_close_connection(ngx_connection_t *c)
#if (NGX_DEBUG)
ngx_uint_t ns;
#endif
+ ngx_uint_t i;
ngx_pool_t *pool;
ngx_event_t *rev;
ngx_rbtree_t *tree;
@@ -748,11 +745,14 @@ ngx_quic_close_connection(ngx_connection_t *c)
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "close quic connection");
- // TODO: free frames from reorder queue if any
-
qc = c->quic;
if (qc) {
+
+ for (i = 0; i < NGX_QUIC_ENCRYPTION_LAST; i++) {
+ ngx_quic_free_frames(c, &qc->crypto[i].frames);
+ }
+
qc->closing = 1;
tree = &qc->streams.tree;
@@ -1201,9 +1201,7 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
case NGX_QUIC_FT_STREAM6:
case NGX_QUIC_FT_STREAM7:
- if (ngx_quic_handle_stream_frame(c, pkt, &frame.u.stream)
- != NGX_OK)
- {
+ if (ngx_quic_handle_stream_frame(c, pkt, &frame) != NGX_OK) {
return NGX_ERROR;
}
@@ -1441,6 +1439,7 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
ngx_quic_frame_t *frame, ngx_quic_frame_handler_pt handler)
{
size_t full_len;
+ ngx_int_t rc;
ngx_queue_t *q;
ngx_quic_ordered_frame_t *f;
@@ -1468,10 +1467,17 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
/* f->offset == fs->received */
- if (handler(c, frame) != NGX_OK) {
+ rc = handler(c, frame);
+ if (rc == NGX_ERROR) {
return NGX_ERROR;
+
+ } else if (rc == NGX_DONE) {
+ /* handler destroyed stream, queue no longer exists */
+ return NGX_OK;
}
+ /* rc == NGX_OK */
+
fs->received += f->length;
/* now check the queue if we can continue with buffered frames */
@@ -1512,8 +1518,14 @@ ngx_quic_handle_ordered_frame(ngx_connection_t *c, ngx_quic_frames_stream_t *fs,
/* f->offset == fs->received */
- if (handler(c, frame) != NGX_OK) {
+ rc = handler(c, frame);
+
+ if (rc == NGX_ERROR) {
return NGX_ERROR;
+
+ } else if (rc == NGX_DONE) {
+ /* handler destroyed stream, queue no longer exists */
+ return NGX_OK;
}
fs->received += f->length;
@@ -1721,20 +1733,54 @@ ngx_quic_crypto_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
static ngx_int_t
-ngx_quic_handle_stream_frame(ngx_connection_t *c,
- ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
+ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
+ ngx_quic_frame_t *frame)
{
- size_t n;
- ngx_buf_t *b;
- ngx_event_t *rev;
- ngx_quic_stream_t *sn;
- ngx_quic_connection_t *qc;
+ ngx_quic_stream_t *sn;
+ ngx_quic_connection_t *qc;
+ ngx_quic_stream_frame_t *f;
+ ngx_quic_frames_stream_t *fs;
qc = c->quic;
+ f = &frame->u.stream;
sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
- if (sn) {
+ if (sn == NULL) {
+ sn = ngx_quic_add_stream(c, f);
+ if (sn == NULL) {
+ return NGX_ERROR;
+ }
+ }
+
+ fs = &sn->fs;
+
+ return ngx_quic_handle_ordered_frame(c, fs, frame, ngx_quic_stream_input);
+}
+
+
+static ngx_int_t
+ngx_quic_stream_input(ngx_connection_t *c, ngx_quic_frame_t *frame)
+{
+ ngx_buf_t *b;
+ ngx_event_t *rev;
+ ngx_quic_stream_t *sn;
+ ngx_quic_connection_t *qc;
+ ngx_quic_stream_frame_t *f;
+
+ qc = c->quic;
+
+ f = &frame->u.stream;
+
+ sn = ngx_quic_find_stream(&qc->streams.tree, f->stream_id);
+
+ if (sn == NULL) {
+ // TODO: possible?
+ // deleted while stream is in reordering queue?
+ return NGX_ERROR;
+ }
+
+ if (sn->fs.received != 0) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
b = sn->b;
@@ -1761,9 +1807,50 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
rev->handler(rev);
}
+ /* check if stream was destroyed */
+ if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
+ return NGX_DONE;
+ }
+
return NGX_OK;
}
+ b = sn->b;
+ b->last = ngx_cpymem(b->last, f->data, f->length);
+
+ rev = sn->c->read;
+ rev->ready = 1;
+
+ if (f->fin) {
+ rev->pending_eof = 1;
+ }
+
+ if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
+ ngx_quic_handle_max_streams(c);
+ }
+
+ qc->streams.handler(sn->c);
+
+ /* check if stream was destroyed */
+ if (ngx_quic_find_stream(&qc->streams.tree, f->stream_id) == NULL) {
+ return NGX_DONE;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_quic_stream_t *
+ngx_quic_add_stream(ngx_connection_t *c, ngx_quic_stream_frame_t *f)
+{
+ size_t n;
+ ngx_quic_stream_t *sn;
+ ngx_quic_connection_t *qc;
+
+ qc = c->quic;
+
+ // TODO: check increasing IDs
+
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "stream is new");
n = (f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL)
@@ -1776,31 +1863,15 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
if (n < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
- return NGX_ERROR;
+ return NULL;
}
sn = ngx_quic_create_stream(c, f->stream_id, n);
if (sn == NULL) {
- return NGX_ERROR;
- }
-
- b = sn->b;
- b->last = ngx_cpymem(b->last, f->data, f->length);
-
- rev = sn->c->read;
- rev->ready = 1;
-
- if (f->fin) {
- rev->pending_eof = 1;
- }
-
- if ((f->stream_id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0) {
- ngx_quic_handle_max_streams(c);
+ return NULL;
}
- qc->streams.handler(sn->c);
-
- return NGX_OK;
+ return sn;
}
@@ -2024,7 +2095,6 @@ ngx_quic_output_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
return NGX_ERROR;
}
-
} while (q != ngx_queue_sentinel(&ctx->frames));
return NGX_OK;
@@ -2037,15 +2107,19 @@ ngx_quic_free_frames(ngx_connection_t *c, ngx_queue_t *frames)
ngx_queue_t *q;
ngx_quic_frame_t *f;
- q = ngx_queue_head(frames);
-
do {
+ q = ngx_queue_head(frames);
+
+ if (q == ngx_queue_sentinel(frames)) {
+ break;
+ }
+
+ ngx_queue_remove(q);
+
f = ngx_queue_data(q, ngx_quic_frame_t, queue);
- q = ngx_queue_next(q);
ngx_quic_free_frame(c, f);
-
- } while (q != ngx_queue_sentinel(frames));
+ } while (1);
}
@@ -2237,7 +2311,7 @@ ngx_quic_retransmit_handler(ngx_event_t *ev)
static void
ngx_quic_push_handler(ngx_event_t *ev)
{
- ngx_connection_t *c;
+ ngx_connection_t *c;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "push timer");
@@ -2430,6 +2504,8 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id, size_t rcvbuf_size)
return NULL;
}
+ ngx_queue_init(&sn->fs.frames);
+
log = ngx_palloc(pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_destroy_pool(pool);
@@ -2595,6 +2671,8 @@ ngx_quic_stream_cleanup_handler(void *data)
return;
}
+ ngx_quic_free_frames(pc, &qs->fs.frames);
+
if ((qs->id & 0x03) == NGX_QUIC_STREAM_UNIDIRECTIONAL) {
/* do not send fin for client unidirectional streams */
return;
diff --git a/src/event/ngx_event_quic.h b/src/event/ngx_event_quic.h
index baf7fb36c..c843fbabe 100644
--- a/src/event/ngx_event_quic.h
+++ b/src/event/ngx_event_quic.h
@@ -29,33 +29,42 @@
typedef struct {
/* configurable */
- ngx_msec_t max_idle_timeout;
- ngx_msec_t max_ack_delay;
-
- ngx_uint_t max_packet_size;
- ngx_uint_t initial_max_data;
- ngx_uint_t initial_max_stream_data_bidi_local;
- ngx_uint_t initial_max_stream_data_bidi_remote;
- ngx_uint_t initial_max_stream_data_uni;
- ngx_uint_t initial_max_streams_bidi;
- ngx_uint_t initial_max_streams_uni;
- ngx_uint_t ack_delay_exponent;
- ngx_uint_t disable_active_migration;
- ngx_uint_t active_connection_id_limit;
+ ngx_msec_t max_idle_timeout;
+ ngx_msec_t max_ack_delay;
+
+ ngx_uint_t max_packet_size;
+ ngx_uint_t initial_max_data;
+ ngx_uint_t initial_max_stream_data_bidi_local;
+ ngx_uint_t initial_max_stream_data_bidi_remote;
+ ngx_uint_t initial_max_stream_data_uni;
+ ngx_uint_t initial_max_streams_bidi;
+ ngx_uint_t initial_max_streams_uni;
+ ngx_uint_t ack_delay_exponent;
+ ngx_uint_t disable_active_migration;
+ ngx_uint_t active_connection_id_limit;
/* TODO */
- ngx_uint_t original_connection_id;
- u_char stateless_reset_token[16];
- void *preferred_address;
+ ngx_uint_t original_connection_id;
+ u_char stateless_reset_token[16];
+ void *preferred_address;
} ngx_quic_tp_t;
+typedef struct {
+ uint64_t sent;
+ uint64_t received;
+ ngx_queue_t frames; /* reorder queue */
+ size_t total; /* size of buffered data */
+} ngx_quic_frames_stream_t;
+
+
struct ngx_quic_stream_s {
- ngx_rbtree_node_t node;
- ngx_connection_t *parent;
- ngx_connection_t *c;
- uint64_t id;
- ngx_buf_t *b;
+ ngx_rbtree_node_t node;
+ ngx_connection_t *parent;
+ ngx_connection_t *c;
+ uint64_t id;
+ ngx_buf_t *b;
+ ngx_quic_frames_stream_t fs;
};