From 00561a961f8c7727556271e424e1e425ac9a88ce Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 28 Oct 2020 00:01:46 +0300 Subject: Libunit: protecting the new mmap from being used in another thread. Until the mmap is received by the router, only the creator thread may use this mmap, so the "mmap not found" state in the router is avoided. --- src/nxt_unit.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index f75d61bc..65a76bee 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -314,6 +314,7 @@ struct nxt_unit_ctx_impl_s { struct nxt_unit_mmap_s { nxt_port_mmap_header_t *hdr; + pthread_t src_thread; /* of nxt_unit_read_buf_t */ nxt_queue_t awaiting_rbuf; @@ -3389,7 +3390,10 @@ retry: for (mm = lib->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { + if (hdr->sent_over != 0xFFFFu + && (hdr->sent_over != port->id.id + || mm->src_thread != pthread_self())) + { continue; } @@ -3657,6 +3661,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) hdr->src_pid = lib->pid; hdr->dst_pid = port->id.pid; hdr->sent_over = port->id.id; + mm->src_thread = pthread_self(); /* Mark first n chunk(s) as busy */ for (i = 0; i < n; i++) { -- cgit From 28ab1de364d048a4cb3f92179adebdd1eb851d65 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 28 Oct 2020 00:01:46 +0300 Subject: Libunit: gracefully quitting a multicontext application. --- src/nxt_unit.c | 96 +++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 24 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 65a76bee..4b0f5230 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -305,6 +305,8 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_read_buf_t */ nxt_queue_t free_rbuf; + int online; + nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -354,7 +356,6 @@ struct nxt_unit_impl_s { pid_t pid; int log_fd; - int online; nxt_unit_ctx_impl_t main_ctx; }; @@ -561,7 +562,6 @@ nxt_unit_create(nxt_unit_init_t *init) lib->ports.slot = NULL; lib->log_fd = STDERR_FILENO; - lib->online = 1; nxt_queue_init(&lib->contexts); @@ -615,10 +615,15 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_unit_lib_use(lib); + pthread_mutex_lock(&lib->mutex); + nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + pthread_mutex_unlock(&lib->mutex); + ctx_impl->use_count = 1; ctx_impl->wait_items = 0; + ctx_impl->online = 1; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); @@ -1119,7 +1124,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - if (new_port_msg->id == (nxt_port_id_t) -1) { + if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); new_port.in_fd = recv_msg->fd[0]; @@ -1161,7 +1166,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - if (new_port_msg->id == (nxt_port_id_t) -1) { + if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { lib->shared_port = port; } else { @@ -4408,18 +4413,20 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + rc = NXT_UNIT_OK; - while (nxt_fast_path(lib->online)) { + while (nxt_fast_path(ctx_impl->online)) { rc = nxt_unit_run_once_impl(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_quit(ctx); break; } } @@ -4696,18 +4703,16 @@ int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { int rc; - nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); rc = NXT_UNIT_OK; - while (nxt_fast_path(lib->online)) { + while (nxt_fast_path(ctx_impl->online)) { rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { rc = NXT_UNIT_ERROR; @@ -4802,13 +4807,16 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; + nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + rc = NXT_UNIT_OK; - while (nxt_fast_path(lib->online)) { + while (nxt_fast_path(ctx_impl->online)) { rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { rc = NXT_UNIT_ERROR; @@ -4867,6 +4875,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; + nxt_unit_ctx_impl_t *ctx_impl; rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { @@ -4874,6 +4883,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); retry: @@ -4906,7 +4916,7 @@ retry: return NXT_UNIT_ERROR; } - if (lib->online) { + if (ctx_impl->online) { goto retry; } @@ -4950,14 +4960,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) queue_fd = -1; - port = nxt_unit_create_port(ctx); + port = nxt_unit_create_port(&new_ctx->ctx); if (nxt_slow_path(port == NULL)) { goto fail; } new_ctx->read_port = port; - queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); if (nxt_slow_path(queue_fd == -1)) { goto fail; } @@ -4976,7 +4986,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); port_impl->queue = mem; - rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd); + rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } @@ -5041,8 +5051,12 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) pthread_mutex_destroy(&ctx_impl->mutex); + pthread_mutex_lock(&lib->mutex); + nxt_queue_remove(&ctx_impl->link); + pthread_mutex_unlock(&lib->mutex); + if (nxt_fast_path(ctx_impl->read_port != NULL)) { nxt_unit_remove_port(lib, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); @@ -5229,7 +5243,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) } if (port_impl->queue != NULL) { - munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1) + munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID) ? sizeof(nxt_app_queue_t) : sizeof(nxt_port_queue_t)); } @@ -5346,7 +5360,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) goto unlock; } - if (port->id.id >= process->next_port_id) { + if (port->id.id != NXT_UNIT_SHARED_PORT_ID + && port->id.id >= process->next_port_id) + { process->next_port_id = port->id.id + 1; } @@ -5514,17 +5530,49 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) static void nxt_unit_quit(nxt_unit_ctx_t *ctx) { - nxt_unit_impl_t *lib; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (lib->online) { - lib->online = 0; + if (!ctx_impl->online) { + return; + } - if (lib->callbacks.quit != NULL) { - lib->callbacks.quit(ctx); - } + ctx_impl->online = 0; + + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); } + + if (ctx != &lib->main_ctx.ctx) { + return; + } + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.pid = lib->pid; + msg.type = _NXT_PORT_MSG_QUIT; + + pthread_mutex_lock(&lib->mutex); + + nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { + + if (ctx == &ctx_impl->ctx + || ctx_impl->read_port == NULL + || ctx_impl->read_port->out_fd == -1) + { + continue; + } + + (void) nxt_unit_port_send(ctx, ctx_impl->read_port, + &msg, sizeof(msg), NULL, 0); + + } nxt_queue_loop; + + pthread_mutex_unlock(&lib->mutex); } -- cgit From a5508cec7a55fe04ab66451c7510fab0e0d4577c Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 28 Oct 2020 00:01:46 +0300 Subject: Libunit: added a function to discern main and worker contexts. --- src/nxt_unit.c | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 4b0f5230..6a0657c6 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4854,6 +4854,17 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) } +int +nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + return (ctx == &lib->main_ctx.ctx); +} + + int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { -- cgit From 131b6a7ffab7b3303a00a50f5cf764dc99e23cc0 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 28 Oct 2020 00:01:46 +0300 Subject: Libunit: releasing cached read buffers when destroying context. --- src/nxt_unit.c | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 6a0657c6..7d2bf2c7 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -5023,6 +5023,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) { nxt_unit_impl_t *lib; nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_read_buf_t *rbuf; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; @@ -5060,6 +5061,13 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) } nxt_queue_loop; + nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link) + { + if (rbuf != &ctx_impl->ctx_read_buf) { + nxt_unit_free(&ctx_impl->ctx, rbuf); + } + } nxt_queue_loop; + pthread_mutex_destroy(&ctx_impl->mutex); pthread_mutex_lock(&lib->mutex); -- cgit From 4cb8aeb31a8cf47f6c61aaccb95bbbf47cbc2393 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 28 Oct 2020 00:01:46 +0300 Subject: Router: introducing the PORT_ACK message. The PORT_ACK message is the router's response to the application's NEW_PORT message. After receiving PORT_ACK, the application is safe to process requests using this port. This message avoids a racing condition when the application starts processing a request from the shared queue and sends REQ_HEADERS_ACK. The REQ_HEADERS_ACK message contains the application port ID as reply_port, which the router uses to send request data. When the application creates a new port, it immediately sends it to the main router thread. Because the request is processed outside the main thread, a racing condition can occur between the receipt of the new port in the main thread and the receipt of REQ_HEADERS_ACK in the worker router thread where the same port is specified as reply_port. --- src/nxt_unit.c | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 7d2bf2c7..d35a3307 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -57,6 +57,7 @@ static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, @@ -306,6 +307,7 @@ struct nxt_unit_ctx_impl_s { nxt_queue_t free_rbuf; int online; + int ready; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -624,6 +626,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->use_count = 1; ctx_impl->wait_items = 0; ctx_impl->online = 1; + ctx_impl->ready = 0; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); @@ -996,6 +999,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = nxt_unit_process_new_port(ctx, &recv_msg); break; + case _NXT_PORT_MSG_PORT_ACK: + rc = nxt_unit_ctx_ready(ctx); + break; + case _NXT_PORT_MSG_CHANGE_FILE: nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", port_msg->stream, recv_msg.fd[0]); @@ -1169,8 +1176,28 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { lib->shared_port = port; - } else { - nxt_unit_port_release(port); + return nxt_unit_ctx_ready(ctx); + } + + nxt_unit_port_release(port); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + ctx_impl->ready = 1; + + if (lib->callbacks.ready_handler) { + return lib->callbacks.ready_handler(ctx); } return NXT_UNIT_OK; @@ -4495,17 +4522,17 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) nxt_unit_port_impl_t *port_impl; struct pollfd fds[2]; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { - + if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, port); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + retry: if (port_impl->from_socket == 0) { -- cgit From d8cc830ea0363009e40d6bf380db1147ad6fb41e Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 28 Oct 2020 00:01:46 +0300 Subject: Libunit: waking another context with the RPC_READY message. --- src/nxt_unit.c | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index d35a3307..7e97c050 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -107,6 +107,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); +static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, + nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); @@ -988,6 +990,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) switch (port_msg->type) { + case _NXT_PORT_MSG_RPC_READY: + rc = NXT_UNIT_OK; + break; + case _NXT_PORT_MSG_QUIT: nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); @@ -1068,7 +1074,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) break; default: - nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", + nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); rc = NXT_UNIT_ERROR; @@ -4012,12 +4018,40 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + nxt_unit_awake_ctx(ctx, ctx_impl); + } nxt_queue_loop; return rc; } +static void +nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_port_msg_t msg; + + if (nxt_fast_path(ctx == &ctx_impl->ctx)) { + return; + } + + if (nxt_slow_path(ctx_impl->read_port == NULL + || ctx_impl->read_port->out_fd == -1)) + { + nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); + + return; + } + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.type = _NXT_PORT_MSG_RPC_READY; + + (void) nxt_unit_port_send(ctx, ctx_impl->read_port, + &msg, sizeof(msg), NULL, 0); +} + + static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) { @@ -5390,6 +5424,8 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + nxt_unit_awake_ctx(ctx, ctx_impl); + } nxt_queue_loop; return old_port; -- cgit From 80a8cb835bb780cdb3047b232809c5dfd6e0e794 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 28 Oct 2020 00:01:46 +0300 Subject: Preserving the app port write socket. The socket is required for intercontextual communication in multithreaded apps. --- src/nxt_unit.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 7e97c050..23848f5b 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -780,7 +780,7 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, uint32_t *shm_limit) { int rc; - int ready_fd, router_fd, read_fd; + int ready_fd, router_fd, read_in_fd, read_out_fd; char *unit_init, *version_end; long version_length; int64_t ready_pid, router_pid, read_pid; @@ -812,15 +812,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" - "%"PRId64",%"PRIu32",%d;" + "%"PRId64",%"PRIu32",%d,%d;" "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &router_pid, &router_id, &router_fd, - &read_pid, &read_id, &read_fd, + &read_pid, &read_id, &read_in_fd, &read_out_fd, log_fd, shm_limit); - if (nxt_slow_path(rc != 12)) { + if (nxt_slow_path(rc != 13)) { nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; @@ -840,8 +840,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); - read_port->in_fd = read_fd; - read_port->out_fd = -1; + read_port->in_fd = read_in_fd; + read_port->out_fd = read_out_fd; read_port->data = NULL; *stream = ready_stream; -- cgit From 896d8e8bfb3d8649db467d92e06c789b789d3feb Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 10 Nov 2020 22:27:08 +0300 Subject: Fixing multi-buffer body send to application. Application shared queue only capable to pass one shared memory buffer. The rest buffers in chain needs to be send directly to application in response to REQ_HEADERS_AC message. The issue can be reproduced for configurations where 'body_buffer_size' is greater than memory segment size (10 Mb). Requests with body size greater than 10 Mb are just `stuck` e.g. not passed to application awaiting for more data from router. The bug was introduced in 1d84b9e4b459 (v1.19.0). --- src/nxt_unit.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 23848f5b..7230552d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -1357,8 +1357,14 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (recv_msg->incoming_buf != NULL) { b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); + while (b->next != NULL) { + b = b->next; + } + /* "Move" incoming buffer list to req_impl. */ - nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); + b->next = recv_msg->incoming_buf; + b->next->prev = &b->next; + recv_msg->incoming_buf = NULL; } @@ -2988,8 +2994,6 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, dst, size); - nxt_unit_req_debug(req, "read: %d", (int) buf_res); - if (buf_res < (ssize_t) size && req->content_fd != -1) { res = read(req->content_fd, dst, size); if (nxt_slow_path(res < 0)) { -- cgit From 8340ca0b9c7ad4109033ccb028f87cc1b73396bc Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Libunit: improving logging consistency. Debug logging depends on macros defined in nxt_auto_config.h. --- src/nxt_unit.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 7230552d..564fd094 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -6527,7 +6527,9 @@ nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) p = malloc(size); if (nxt_fast_path(p != NULL)) { +#if (NXT_DEBUG_ALLOC) nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); +#endif } else { nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", @@ -6541,7 +6543,9 @@ nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) { +#if (NXT_DEBUG_ALLOC) nxt_unit_debug(ctx, "free(%p)", p); +#endif free(p); } -- cgit From 0ec69aa46e3577ef44060b9dd8576faab4a863ce Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Libunit: fixing racing condition for port add / state change. The issue only occurred in Go applications because "port_send" is overloaded only in Go. To reproduce it, send multiple concurrent requests to the application after it has initialised. The warning message "[unit] [go] port NNN:dd not found" is the first visible aspect of the issue; the second and more valuable one is a closed connection, an error response, or a hanging response to some requests. When the application starts, it is unaware of the router's worker thread ports, so it requests the ports from the router after receiving requests from the corresponding router worker threads. When multiple requests are processed simultaneously, the router port may be required by several requests, so request processing starts only after the application receives the required port information. The port should be added to the Go port repository after its 'ready' flag is updated. Otherwise, Unit may start processing some requests and use the port before it is in the repository. The issue was introduced in changeset 78836321a126. --- src/nxt_unit.c | 126 +++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 39 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 564fd094..69cae8bb 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -147,6 +147,8 @@ nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue); +static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, + nxt_queue_t *awaiting_req); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, @@ -5340,14 +5342,12 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) { - int rc; - nxt_queue_t awaiting_req; - nxt_unit_impl_t *lib; - nxt_unit_port_t *old_port; - nxt_unit_process_t *process; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_port_impl_t *new_port, *old_port_impl; - nxt_unit_request_info_impl_t *req_impl; + int rc, ready; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_port_impl_t *new_port, *old_port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -5396,46 +5396,45 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) old_port_impl->queue = queue; } - if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { - nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); - nxt_queue_init(&old_port_impl->awaiting_req); - } - - old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + ready = (port->in_fd != -1 || port->out_fd != -1); - pthread_mutex_unlock(&lib->mutex); + /* + * Port can be market as 'ready' only after callbacks.add_port() call. + * Otherwise, request may try to use the port before callback. + */ + if (lib->callbacks.add_port == NULL && ready) { + old_port_impl->ready = ready; - if (lib->callbacks.add_port != NULL - && (port->in_fd != -1 || port->out_fd != -1)) - { - lib->callbacks.add_port(ctx, old_port); + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } } - nxt_queue_each(req_impl, &awaiting_req, - nxt_unit_request_info_impl_t, port_wait_link) - { - nxt_queue_remove(&req_impl->port_wait_link); - - ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, - ctx); + pthread_mutex_unlock(&lib->mutex); - pthread_mutex_lock(&ctx_impl->mutex); + if (lib->callbacks.add_port != NULL && ready) { + lib->callbacks.add_port(ctx, old_port); - nxt_queue_insert_tail(&ctx_impl->ready_req, - &req_impl->port_wait_link); + pthread_mutex_lock(&lib->mutex); - pthread_mutex_unlock(&ctx_impl->mutex); + old_port_impl->ready = ready; - nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } - nxt_unit_awake_ctx(ctx, ctx_impl); + pthread_mutex_unlock(&lib->mutex); + } - } nxt_queue_loop; + nxt_unit_process_awaiting_req(ctx, &awaiting_req); return old_port; } new_port = NULL; + ready = 0; nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", port->id.pid, port->id.id, @@ -5478,13 +5477,21 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) new_port->use_count = 2; new_port->process = process; - new_port->ready = (port->in_fd != -1 || port->out_fd != -1); new_port->queue = queue; new_port->from_socket = 0; new_port->socket_rbuf = NULL; nxt_queue_init(&new_port->awaiting_req); + ready = (port->in_fd != -1 || port->out_fd != -1); + + if (lib->callbacks.add_port == NULL) { + new_port->ready = ready; + + } else { + new_port->ready = 0; + } + process = NULL; unlock: @@ -5495,14 +5502,55 @@ unlock: nxt_unit_process_release(process); } - if (lib->callbacks.add_port != NULL - && new_port != NULL - && (port->in_fd != -1 || port->out_fd != -1)) - { + if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { lib->callbacks.add_port(ctx, &new_port->port); + + nxt_queue_init(&awaiting_req); + + pthread_mutex_lock(&lib->mutex); + + new_port->ready = 1; + + if (!nxt_queue_is_empty(&new_port->awaiting_req)) { + nxt_queue_add(&awaiting_req, &new_port->awaiting_req); + nxt_queue_init(&new_port->awaiting_req); + } + + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_process_awaiting_req(ctx, &awaiting_req); } - return &new_port->port; + return (new_port == NULL) ? NULL : &new_port->port; +} + + +static void +nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) +{ + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_each(req_impl, awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + nxt_unit_awake_ctx(ctx, ctx_impl); + + } nxt_queue_loop; } -- cgit From d26afcb481d97cd71db014b16bde44e807043a2b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Libunit: fixing racing condition in request struct recycling. The issue occurred under highly concurrent request load in Go applications. Such applications are multi-threaded but use a single libunit context; any thread-safe code in the libunit context is only required for Go applications. As a result of improper request state reset, the recycled request structure was recovered in the released state, so further operations with this request resulted in 'response already sent' warnings. However, the actual response was never delivered to the router and the client. --- src/nxt_unit.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 69cae8bb..f0c68374 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -1751,6 +1751,8 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response_port = NULL; } + req_impl->state = NXT_UNIT_RS_RELEASED; + pthread_mutex_lock(&ctx_impl->mutex); nxt_queue_remove(&req_impl->link); @@ -1758,8 +1760,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); pthread_mutex_unlock(&ctx_impl->mutex); - - req_impl->state = NXT_UNIT_RS_RELEASED; } -- cgit From 8132e1f700934a32bc9e3fb0ab66f550a335a326 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Go: removing C proxy functions and re-using goroutines. --- src/nxt_unit.c | 75 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 12 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index f0c68374..44525d04 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -54,12 +54,13 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd); -static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, + nxt_unit_request_info_t **preq); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, @@ -904,7 +905,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) static int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, + nxt_unit_request_info_t **preq) { int rc; pid_t pid; @@ -1040,7 +1042,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) break; case _NXT_PORT_MSG_REQ_HEADERS: - rc = nxt_unit_process_req_headers(ctx, &recv_msg); + rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); break; case _NXT_PORT_MSG_REQ_BODY: @@ -1213,7 +1215,8 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) static int -nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_request_info_t **preq) { int res; nxt_unit_impl_t *lib; @@ -1329,7 +1332,12 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } } - lib->callbacks.request_handler(req); + if (preq == NULL) { + lib->callbacks.request_handler(req); + + } else { + *preq = req; + } } return NXT_UNIT_OK; @@ -2179,7 +2187,8 @@ nxt_unit_response_add_field(nxt_unit_request_info_t *req, resp = req->response; if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { - nxt_unit_req_warn(req, "add_field: too many response fields"); + nxt_unit_req_warn(req, "add_field: too many response fields (%d)", + (int) resp->fields_count); return NXT_UNIT_ERROR; } @@ -2356,6 +2365,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); + return NULL; } @@ -4537,7 +4548,7 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) return rc; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; } @@ -4686,7 +4697,7 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { - rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); + rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL); } else { nxt_unit_read_buf_release(ctx, rbuf); @@ -4793,7 +4804,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) goto retry; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } @@ -4902,7 +4913,7 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) break; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } @@ -4921,6 +4932,46 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) } +nxt_unit_request_info_t * +nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_t *req; + + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + req = NULL; + + if (nxt_slow_path(!ctx_impl->online)) { + goto done; + } + + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + goto done; + } + + rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + if (rc != NXT_UNIT_OK) { + goto done; + } + + (void) nxt_unit_process_msg(ctx, rbuf, &req); + +done: + + nxt_unit_ctx_release(ctx); + + return req; +} + + int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) { @@ -4977,7 +5028,7 @@ retry: return rc; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; } -- cgit From 300347a5cffa4187921384bdd02c5bb90875f9e5 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Libunit: making minor tweaks. Removing unnecessary context operations from shared queue processing loop. Initializing temporary queues only when required. --- src/nxt_unit.c | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 44525d04..e74c8370 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4675,8 +4675,6 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; - nxt_queue_init(&pending_rbuf); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); @@ -4687,6 +4685,8 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) return NXT_UNIT_OK; } + nxt_queue_init(&pending_rbuf); + nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); nxt_queue_init(&ctx_impl->pending_rbuf); @@ -4719,8 +4719,6 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; - nxt_queue_init(&ready_req); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); @@ -4731,6 +4729,8 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) return; } + nxt_queue_init(&ready_req); + nxt_queue_add(&ready_req, &ctx_impl->ready_req); nxt_queue_init(&ctx_impl->ready_req); @@ -4917,13 +4917,6 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } - - rc = nxt_unit_process_pending_rbuf(ctx); - if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { - break; - } - - nxt_unit_process_ready_req(ctx); } nxt_unit_ctx_release(ctx); -- cgit From 6c3c83561a97b91f18a771e0c582c5ed4013a9a6 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Libunit: closing active requests on quit. --- src/nxt_unit.c | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index e74c8370..69948954 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -1643,8 +1643,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } if (recv_msg->last) { - req_impl->websocket = 0; - if (cb->close_handler) { nxt_unit_req_debug(req, "close_handler"); @@ -1737,8 +1735,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); } - req_impl->websocket = 0; - while (req_impl->outgoing_buf != NULL) { nxt_unit_mmap_buf_free(req_impl->outgoing_buf); } @@ -5708,9 +5704,12 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) static void nxt_unit_quit(nxt_unit_ctx_t *ctx) { - nxt_port_msg_t msg; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_callbacks_t *cb; + nxt_unit_request_info_t *req; + nxt_unit_request_info_impl_t *req_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); @@ -5721,10 +5720,30 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) ctx_impl->online = 0; - if (lib->callbacks.quit != NULL) { - lib->callbacks.quit(ctx); + cb = &lib->callbacks; + + if (cb->quit != NULL) { + cb->quit(ctx); } + nxt_queue_each(req_impl, &ctx_impl->active_req, + nxt_unit_request_info_impl_t, link) + { + req = &req_impl->req; + + nxt_unit_req_warn(req, "active request on ctx quit"); + + if (cb->close_handler) { + nxt_unit_req_debug(req, "close_handler"); + + cb->close_handler(req); + + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + + } nxt_queue_loop; + if (ctx != &lib->main_ctx.ctx) { return; } -- cgit From 66bb41e8bbe81d82a66f0d7188ad89963b4cd251 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Libunit: fixing read buffer allocations on exit. --- src/nxt_unit.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 69948954..0790afc4 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -5029,12 +5029,12 @@ retry: nxt_unit_process_ready_req(ctx); - rbuf = nxt_unit_read_buf_get(ctx); - if (nxt_slow_path(rbuf == NULL)) { - return NXT_UNIT_ERROR; - } - if (ctx_impl->online) { + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + goto retry; } -- cgit From 2a381a82a6e1bc2bd5d2f43a08fce50a1994f2e8 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 19 Nov 2020 13:49:12 +0300 Subject: Libunit: fixing read buffer leakage. If shared queue is empty, allocated read buffer should be explicitly released. Found by Coverity (CID 363943). The issue was introduced in f5ba5973a0a3. --- src/nxt_unit.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 0790afc4..097f50d6 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4948,6 +4948,7 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); if (rc != NXT_UNIT_OK) { + nxt_unit_read_buf_release(ctx, rbuf); goto done; } -- cgit