From 956fce6614dd0979a2995c792131d894e132e614 Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Tue, 24 Nov 2020 16:40:35 +0300 Subject: Libunit: improved error logging around initialization env variable. --- src/nxt_unit.c | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 097f50d6..b6904ce9 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -784,8 +784,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, { int rc; int ready_fd, router_fd, read_in_fd, read_out_fd; - char *unit_init, *version_end; - long version_length; + char *unit_init, *version_end, *vars; + size_t version_length; int64_t ready_pid, router_pid, read_pid; uint32_t ready_stream, router_id, ready_id, read_id; @@ -797,21 +797,30 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); + version_end = strchr(unit_init, ';'); + if (nxt_slow_path(version_end == NULL)) { + nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"", + NXT_UNIT_INIT_ENV, unit_init); + + return NXT_UNIT_ERROR; + } - version_length = nxt_length(NXT_VERSION); + version_length = version_end - unit_init; - version_end = strchr(unit_init, ';'); - if (version_end == NULL - || version_end - unit_init != version_length - || memcmp(unit_init, NXT_VERSION, version_length) != 0) - { - nxt_unit_alert(NULL, "version check error"); + rc = version_length != nxt_length(NXT_VERSION) + || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION)); + + if (nxt_slow_path(rc != 0)) { + nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version " + "%.*s, while the app was compiled with libunit %s", + (int) version_length, unit_init, NXT_VERSION); return NXT_UNIT_ERROR; } - rc = sscanf(version_end + 1, + vars = version_end + 1; + + rc = sscanf(vars, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" @@ -823,12 +832,22 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, &read_pid, &read_id, &read_in_fd, &read_out_fd, log_fd, shm_limit); + if (nxt_slow_path(rc == EOF)) { + nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", + vars, strerror(errno), errno, NXT_UNIT_INIT_ENV); + + return NXT_UNIT_ERROR; + } + if (nxt_slow_path(rc != 13)) { - nxt_unit_alert(NULL, "failed to scan variables: %d", rc); + nxt_unit_alert(NULL, "invalid number of variables in %s env: " + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars); return NXT_UNIT_ERROR; } + nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); + nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); ready_port->in_fd = -1; -- cgit From 7389a50835696fe256c5decf31bec129f1d59bbf Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 18 Dec 2020 00:25:27 +0300 Subject: Limiting app queue notifications count in socket. Under high load, a queue synchonization issue may occur, starting from the steady state when an app queue message is dequeued immediately after it has been enqueued. In this state, the router always puts the first message in the queue and is forced to notify the app about a new message in an empty queue using a socket pair. On the other hand, the application dequeues and processes the message without reading the notification from the socket, so the socket buffer overflows with notifications. The issue was reproduced during Unit load tests. After a socket buffer overflow, the router is unable to notify the app about a new first message. When another message is enqueued, a notification is not required, so the queue grows without being read by the app. As a result, request processing stops. This patch changes the notification algorithm by counting the notifications in the pipe instead of getting the number of messages in the queue. --- src/nxt_unit.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index b6904ce9..2cdc75f8 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -6092,7 +6092,10 @@ static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { - int res; + int res; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); retry: @@ -6105,6 +6108,8 @@ retry: } if (nxt_unit_is_read_queue(rbuf)) { + nxt_app_queue_notification_received(port_impl->queue); + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", (int) port->id.pid, (int) port->id.id, (int) rbuf->size); -- cgit From 7b669ed866896afbf26ab6bc0737fe7c8f9c2ec5 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 18 Dec 2020 00:25:28 +0300 Subject: Libunit: fixed shared memory waiting. The nxt_unit_ctx_port_recv() function may return the NXT_UNIT_AGAIN code, in which case an attempt to reread the message should be made. The issue was reproduced in load testing with response sizes 16k and up. In the rare case of a NXT_UNIT_AGAIN result, a buffer of size -1 was processed, which triggered a 'message too small' alert; after that, the app process was terminated. --- src/nxt_unit.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 2cdc75f8..39e7f076 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -3606,7 +3606,10 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + do { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + } while (res == NXT_UNIT_AGAIN); + if (res == NXT_UNIT_ERROR) { nxt_unit_read_buf_release(ctx, rbuf); -- cgit From d65a66f9d813294917822554311281c5e1a7126b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 29 Dec 2020 19:01:24 +0300 Subject: Libunit: processing single port message. This partially reverts the optimisation introduced in 1d84b9e4b459 to avoid an unpredictable block in nxt_unit_process_port_msg(). Under high load, this function may never return control to its caller, and the external event loop (in Node.js and Python asyncio) won't be able to process other scheduled events. To reproduce the issue, two request processing types are needed: 'fast' and 'furious'. The 'fast' one simply returns a small response, while the 'furious' schedules asynchronous calls to external resources. Thus, if Unit is subjected to a large amount of 'fast' requests, the 'furious' request processing freezes until the high load ends. The issue was found by Wu Jian Ping (@wujjpp) during Node.js stream implementation discussion and relates to PR #502 on GitHub. --- src/nxt_unit.c | 13 ------------- 1 file changed, 13 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 39e7f076..2fef17c5 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -5016,7 +5016,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { @@ -5024,9 +5023,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - -retry: if (port == lib->shared_port) { rc = nxt_unit_shared_port_recv(ctx, port, rbuf); @@ -5052,15 +5048,6 @@ retry: nxt_unit_process_ready_req(ctx); - if (ctx_impl->online) { - rbuf = nxt_unit_read_buf_get(ctx); - if (nxt_slow_path(rbuf == NULL)) { - return NXT_UNIT_ERROR; - } - - goto retry; - } - return rc; } -- cgit