From 956fce6614dd0979a2995c792131d894e132e614 Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Tue, 24 Nov 2020 16:40:35 +0300 Subject: Libunit: improved error logging around initialization env variable. --- src/nxt_unit.c | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 097f50d6..b6904ce9 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -784,8 +784,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, { int rc; int ready_fd, router_fd, read_in_fd, read_out_fd; - char *unit_init, *version_end; - long version_length; + char *unit_init, *version_end, *vars; + size_t version_length; int64_t ready_pid, router_pid, read_pid; uint32_t ready_stream, router_id, ready_id, read_id; @@ -797,21 +797,30 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); + version_end = strchr(unit_init, ';'); + if (nxt_slow_path(version_end == NULL)) { + nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"", + NXT_UNIT_INIT_ENV, unit_init); + + return NXT_UNIT_ERROR; + } - version_length = nxt_length(NXT_VERSION); + version_length = version_end - unit_init; - version_end = strchr(unit_init, ';'); - if (version_end == NULL - || version_end - unit_init != version_length - || memcmp(unit_init, NXT_VERSION, version_length) != 0) - { - nxt_unit_alert(NULL, "version check error"); + rc = version_length != nxt_length(NXT_VERSION) + || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION)); + + if (nxt_slow_path(rc != 0)) { + nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version " + "%.*s, while the app was compiled with libunit %s", + (int) version_length, unit_init, NXT_VERSION); return NXT_UNIT_ERROR; } - rc = sscanf(version_end + 1, + vars = version_end + 1; + + rc = sscanf(vars, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" @@ -823,12 +832,22 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, &read_pid, &read_id, &read_in_fd, &read_out_fd, log_fd, shm_limit); + if (nxt_slow_path(rc == EOF)) { + nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", + vars, strerror(errno), errno, NXT_UNIT_INIT_ENV); + + return NXT_UNIT_ERROR; + } + if (nxt_slow_path(rc != 13)) { - nxt_unit_alert(NULL, "failed to scan variables: %d", rc); + nxt_unit_alert(NULL, "invalid number of variables in %s env: " + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars); return NXT_UNIT_ERROR; } + nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); + nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); ready_port->in_fd = -1; -- cgit From 55296e6ff2613a0b2ec588beaf01620b2679c3d1 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 30 Nov 2020 23:30:20 +0300 Subject: Node.js: removing unnecessary warnings. Warnings changed for debug messages. --- src/nodejs/unit-http/unit.cpp | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index c5bca49a..b166707e 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -998,33 +998,21 @@ Unit::websocket_set_sock(napi_env env, napi_callback_info info) void -Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; - - req = (nxt_unit_request_info_t *) nativeObject; - - nxt_unit_warn(NULL, "conn_destroy: %p", req); + nxt_unit_req_debug((nxt_unit_request_info_t *) r, "conn_destroy: %p", r); } void -Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; - - req = (nxt_unit_request_info_t *) nativeObject; - - nxt_unit_warn(NULL, "sock_destroy: %p", req); + nxt_unit_req_debug((nxt_unit_request_info_t *) r, "sock_destroy: %p", r); } void -Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) +Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_request_info_t *req; - - req = (nxt_unit_request_info_t *) nativeObject; - - nxt_unit_warn(NULL, "resp_destroy: %p", req); + nxt_unit_req_debug((nxt_unit_request_info_t *) r, "resp_destroy: %p", r); } -- cgit From db42527b1b2656141af0d8280e59e23be6af67d6 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 7 Dec 2020 17:56:18 +0300 Subject: Node.js: avoided use of request struct for debug logging. This fixes a crash on exit of Node.js application. The crash reproduced on Ubuntu 20.10 with Node.js v15.1.0. Tests 'test_node_websockets_two_clients' and 'test_node_websockets_7_13_1__7_13_2'. The reason of the crash is using request struct which was already freed. The issue was introduced in 5be509fda29e. --- src/nodejs/unit-http/unit.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index b166707e..0e049865 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -1000,19 +1000,19 @@ Unit::websocket_set_sock(napi_env env, napi_callback_info info) void Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_req_debug((nxt_unit_request_info_t *) r, "conn_destroy: %p", r); + nxt_unit_req_debug(NULL, "conn_destroy: %p", r); } void Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_req_debug((nxt_unit_request_info_t *) r, "sock_destroy: %p", r); + nxt_unit_req_debug(NULL, "sock_destroy: %p", r); } void Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) { - nxt_unit_req_debug((nxt_unit_request_info_t *) r, "resp_destroy: %p", r); + nxt_unit_req_debug(NULL, "resp_destroy: %p", r); } -- cgit From bda76b04e9aec295c7fa11c7eacb6e11c4bf67f4 Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Mon, 7 Dec 2020 18:50:56 +0300 Subject: HTTP: fixed status line format for unknown status codes. According to Section #3.1.2 of RFC 7230, after the status code there must be a space even if the reason phrase is empty. Also, only 3 digits allowed. This closes #507 issue on GitHub. --- src/nxt_h1proto.c | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index dccbe56c..6aef264c 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -1151,19 +1151,19 @@ static const nxt_str_t nxt_http_client_error[] = { nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"), nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"), nxt_string("HTTP/1.1 417 Expectation Failed\r\n"), - nxt_string("HTTP/1.1 418\r\n"), - nxt_string("HTTP/1.1 419\r\n"), - nxt_string("HTTP/1.1 420\r\n"), - nxt_string("HTTP/1.1 421\r\n"), - nxt_string("HTTP/1.1 422\r\n"), - nxt_string("HTTP/1.1 423\r\n"), - nxt_string("HTTP/1.1 424\r\n"), - nxt_string("HTTP/1.1 425\r\n"), + nxt_string("HTTP/1.1 418 I'm a teapot\r\n"), + nxt_string("HTTP/1.1 419 \r\n"), + nxt_string("HTTP/1.1 420 \r\n"), + nxt_string("HTTP/1.1 421 Misdirected Request\r\n"), + nxt_string("HTTP/1.1 422 Unprocessable Entity\r\n"), + nxt_string("HTTP/1.1 423 Locked\r\n"), + nxt_string("HTTP/1.1 424 Failed Dependency\r\n"), + nxt_string("HTTP/1.1 425 \r\n"), nxt_string("HTTP/1.1 426 Upgrade Required\r\n"), - nxt_string("HTTP/1.1 427\r\n"), - nxt_string("HTTP/1.1 428\r\n"), - nxt_string("HTTP/1.1 429\r\n"), - nxt_string("HTTP/1.1 430\r\n"), + nxt_string("HTTP/1.1 427 \r\n"), + nxt_string("HTTP/1.1 428 \r\n"), + nxt_string("HTTP/1.1 429 \r\n"), + nxt_string("HTTP/1.1 430 \r\n"), nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"), }; @@ -1190,7 +1190,7 @@ static const nxt_str_t nxt_http_server_error[] = { }; -#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 65536\r\n") +#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 999 \r\n") static void nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, @@ -1248,13 +1248,16 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, { status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR]; - } else { - p = nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH, - "HTTP/1.1 %03d\r\n", n); + } else if (n <= NXT_HTTP_STATUS_MAX) { + (void) nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH, + "HTTP/1.1 %03d \r\n", n); - unknown_status.length = p - buf; + unknown_status.length = UNKNOWN_STATUS_LENGTH; unknown_status.start = buf; status = &unknown_status; + + } else { + status = &nxt_http_server_error[0]; } size = status->length; -- cgit From 2348229dc7656f36a7915d85af56aae9ed9fb120 Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Tue, 8 Dec 2020 01:59:46 +0300 Subject: PHP: populating PHP_AUTH_* server variables. This closes #498 issue on GitHub. --- src/nxt_h1proto.c | 2 ++ src/nxt_http.h | 1 + src/nxt_php_sapi.c | 11 +++++++++++ src/nxt_router.c | 4 ++++ src/nxt_unit_request.h | 1 + 5 files changed, 19 insertions(+) (limited to 'src') diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 6aef264c..d3da6942 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -174,6 +174,8 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Content-Type"), &nxt_http_request_field, offsetof(nxt_http_request_t, content_type) }, { nxt_string("Content-Length"), &nxt_http_request_content_length, 0 }, + { nxt_string("Authorization"), &nxt_http_request_field, + offsetof(nxt_http_request_t, authorization) }, }; diff --git a/src/nxt_http.h b/src/nxt_http.h index 1418be95..e30bfeb4 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -156,6 +156,7 @@ struct nxt_http_request_s { nxt_http_field_t *cookie; nxt_http_field_t *referer; nxt_http_field_t *user_agent; + nxt_http_field_t *authorization; nxt_off_t content_length_n; nxt_sockaddr_t *remote; diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index d2fbdd27..369e7f32 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -1038,6 +1038,17 @@ nxt_php_execute(nxt_php_run_ctx_t *ctx, nxt_unit_request_t *r) ctx->cookie = nxt_unit_sptr_get(&f->value); } + if (r->authorization_field != NXT_UNIT_NONE_FIELD) { + f = r->fields + r->authorization_field; + + php_handle_auth_data(nxt_unit_sptr_get(&f->value)); + + } else { + SG(request_info).auth_digest = NULL; + SG(request_info).auth_user = NULL; + SG(request_info).auth_password = NULL; + } + SG(sapi_headers).http_response_code = 200; SG(request_info).path_translated = NULL; diff --git a/src/nxt_router.c b/src/nxt_router.c index 9dd5c30e..871602e4 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5169,6 +5169,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, req->content_length_field = NXT_UNIT_NONE_FIELD; req->content_type_field = NXT_UNIT_NONE_FIELD; req->cookie_field = NXT_UNIT_NONE_FIELD; + req->authorization_field = NXT_UNIT_NONE_FIELD; dst_field = req->fields; @@ -5193,6 +5194,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, } else if (field == r->cookie) { req->cookie_field = dst_field - req->fields; + + } else if (field == r->authorization) { + req->authorization_field = dst_field - req->fields; } nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", diff --git a/src/nxt_unit_request.h b/src/nxt_unit_request.h index fede00d2..5dbf648d 100644 --- a/src/nxt_unit_request.h +++ b/src/nxt_unit_request.h @@ -31,6 +31,7 @@ struct nxt_unit_request_s { uint32_t content_length_field; uint32_t content_type_field; uint32_t cookie_field; + uint32_t authorization_field; uint64_t content_length; -- cgit From d3796d1fb7008629a8fa505481dab96efe60cbdb Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 7 Dec 2020 18:17:25 +0000 Subject: Ruby: fixed crash on thread start. Ruby threads need to be created with GVL; otherwise, an attempt to access locked resources may occur, causing a crash. The issue was occasionally reproduced on Ubuntu 18.04 with Ruby 2.5.1 while running test_ruby_application_threads. --- src/ruby/nxt_ruby.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 698d4a43..0aad887d 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -38,6 +38,7 @@ static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx); static void nxt_ruby_request_handler(nxt_unit_request_info_t *req); static void *nxt_ruby_request_handler_gvl(void *req); static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx); +static void *nxt_ruby_thread_create_gvl(void *rctx); static VALUE nxt_ruby_thread_func(VALUE arg); static void *nxt_ruby_unit_run(void *ctx); static void nxt_ruby_ubf(void *ctx); @@ -1141,7 +1142,7 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx) rctx->ctx = ctx; - res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx); + res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx); if (nxt_fast_path(res != Qnil)) { nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); @@ -1159,6 +1160,17 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx) } +static void * +nxt_ruby_thread_create_gvl(void *rctx) +{ + VALUE res; + + res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx); + + return (void *) (uintptr_t) res; +} + + static VALUE nxt_ruby_thread_func(VALUE arg) { -- cgit From 1e9def50c8ecc9f9331908b5fd46b218019a0fb0 Mon Sep 17 00:00:00 2001 From: Tiago Natel de Moura Date: Mon, 14 Dec 2020 12:00:28 +0000 Subject: Isolation: fixed unmounting when mnt namespace is in place. The code had a wrong assumption that "mount namespaces" automatically unmounts process mounts when exits but this happens only with unprivileged mounts. --- src/nxt_isolation.c | 6 ------ 1 file changed, 6 deletions(-) (limited to 'src') diff --git a/src/nxt_isolation.c b/src/nxt_isolation.c index 1e6323bc..cab0074b 100644 --- a/src/nxt_isolation.c +++ b/src/nxt_isolation.c @@ -676,12 +676,6 @@ nxt_isolation_unmount_all(nxt_task_t *task, nxt_process_t *process) return; } -#if (NXT_HAVE_CLONE_NEWNS) - if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS)) { - return; - } -#endif - nxt_debug(task, "unmount all (%s)", process->name); automount = &process->isolation.automount; -- cgit From 8d65a3303bde4fa2725310cd38af311e503e75ae Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Mon, 14 Dec 2020 17:15:49 +0300 Subject: Python: WSGI environment copying moved out of request processing. The WSGI environment dictionary contains a number of static items, that are pre-initialized on application start. Then it's copied for each request to be filled with request-related data. Now this dictionary copy operation will be done between processing of requests, which should save some CPU cycles during request processing and thus reduce response latency for non-peak load periods. --- src/python/nxt_python_wsgi.c | 65 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/python/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c index da7b183c..77c45af5 100644 --- a/src/python/nxt_python_wsgi.c +++ b/src/python/nxt_python_wsgi.c @@ -59,6 +59,7 @@ static void nxt_python_wsgi_done(void); static void nxt_python_request_handler(nxt_unit_request_info_t *req); static PyObject *nxt_python_create_environ(nxt_python_app_conf_t *c); +static PyObject *nxt_python_copy_environ(nxt_unit_request_info_t *req); static PyObject *nxt_python_get_environ(nxt_python_ctx_t *pctx); static int nxt_python_add_sptr(nxt_python_ctx_t *pctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size); @@ -221,6 +222,7 @@ nxt_python_wsgi_ctx_data_alloc(void **pdata) } pctx->write = NULL; + pctx->environ = NULL; pctx->start_resp = PyCFunction_New(nxt_py_start_resp_method, (PyObject *) pctx); @@ -237,6 +239,11 @@ nxt_python_wsgi_ctx_data_alloc(void **pdata) goto fail; } + pctx->environ = nxt_python_copy_environ(NULL); + if (nxt_slow_path(pctx->environ == NULL)) { + goto fail; + } + *pdata = pctx; return NXT_UNIT_OK; @@ -258,6 +265,7 @@ nxt_python_wsgi_ctx_data_free(void *data) Py_XDECREF(pctx->start_resp); Py_XDECREF(pctx->write); + Py_XDECREF(pctx->environ); Py_XDECREF(pctx); } @@ -295,6 +303,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) int rc; PyObject *environ, *args, *response, *iterator, *item; PyObject *close, *result; + nxt_bool_t prepare_environ; nxt_python_ctx_t *pctx; pctx = req->ctx->data; @@ -305,6 +314,19 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) PyEval_RestoreThread(pctx->thread_state); + if (nxt_slow_path(pctx->environ == NULL)) { + pctx->environ = nxt_python_copy_environ(req); + + if (pctx->environ == NULL) { + prepare_environ = 0; + + rc = NXT_UNIT_ERROR; + goto done; + } + } + + prepare_environ = 1; + environ = nxt_python_get_environ(pctx); if (nxt_slow_path(environ == NULL)) { rc = NXT_UNIT_ERROR; @@ -418,6 +440,14 @@ done: pctx->req = NULL; nxt_unit_request_done(req, rc); + + if (nxt_fast_path(prepare_environ)) { + PyEval_RestoreThread(pctx->thread_state); + + pctx->environ = nxt_python_copy_environ(NULL); + + pctx->thread_state = PyEval_SaveThread(); + } } @@ -532,23 +562,30 @@ fail: static PyObject * -nxt_python_get_environ(nxt_python_ctx_t *pctx) +nxt_python_copy_environ(nxt_unit_request_info_t *req) { - int rc; - uint32_t i, j, vl; - PyObject *environ; - nxt_unit_field_t *f, *f2; - nxt_unit_request_t *r; + PyObject *environ; environ = PyDict_Copy(nxt_py_environ_ptyp); + if (nxt_slow_path(environ == NULL)) { - nxt_unit_req_error(pctx->req, + nxt_unit_req_alert(req, "Python failed to copy the \"environ\" dictionary"); - - return NULL; + nxt_python_print_exception(); } - pctx->environ = environ; + return environ; +} + + +static PyObject * +nxt_python_get_environ(nxt_python_ctx_t *pctx) +{ + int rc; + uint32_t i, j, vl; + PyObject *environ; + nxt_unit_field_t *f, *f2; + nxt_unit_request_t *r; r = pctx->req->request; @@ -628,7 +665,7 @@ nxt_python_get_environ(nxt_python_ctx_t *pctx) #undef RC - if (nxt_slow_path(PyDict_SetItem(environ, nxt_py_wsgi_input_str, + if (nxt_slow_path(PyDict_SetItem(pctx->environ, nxt_py_wsgi_input_str, (PyObject *) pctx) != 0)) { nxt_unit_req_error(pctx->req, @@ -636,11 +673,15 @@ nxt_python_get_environ(nxt_python_ctx_t *pctx) goto fail; } + environ = pctx->environ; + pctx->environ = NULL; + return environ; fail: - Py_DECREF(environ); + Py_DECREF(pctx->environ); + pctx->environ = NULL; return NULL; } -- cgit From c0449e13f80312a2e09f643e1a69e536384eae79 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 17 Dec 2020 19:27:44 +0300 Subject: Router: fixed crash in OOSM processing. Multithreaded application may create different shared memory segments in different threads. The segments then passed to different router threads. Because of this multithreading, the order of adding incoming segments is not determined and there can be situation when some of the incoming segments are not initialized yet. This patch simply adds check for NULL to skip non-initialized segments. Crash reproduced during load tests with high number of simultaneous connections (1024 and more). --- src/nxt_router.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/nxt_router.c b/src/nxt_router.c index 871602e4..0416dea0 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5373,7 +5373,7 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_bool_t ack; nxt_process_t *process; nxt_free_map_t *m; - nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "oosm in %PI", msg->port_msg.pid); @@ -5394,8 +5394,13 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_thread_mutex_lock(&process->incoming.mutex); for (i = 0; i < process->incoming.size; i++) { - hdr = process->incoming.elts[i].mmap_handler->hdr; - m = hdr->free_map; + mmap_handler = process->incoming.elts[i].mmap_handler; + + if (nxt_slow_path(mmap_handler == NULL)) { + continue; + } + + m = mmap_handler->hdr->free_map; for (mi = 0; mi < MAX_FREE_IDX; mi++) { if (m[mi] != 0) { -- cgit From 7389a50835696fe256c5decf31bec129f1d59bbf Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 18 Dec 2020 00:25:27 +0300 Subject: Limiting app queue notifications count in socket. Under high load, a queue synchonization issue may occur, starting from the steady state when an app queue message is dequeued immediately after it has been enqueued. In this state, the router always puts the first message in the queue and is forced to notify the app about a new message in an empty queue using a socket pair. On the other hand, the application dequeues and processes the message without reading the notification from the socket, so the socket buffer overflows with notifications. The issue was reproduced during Unit load tests. After a socket buffer overflow, the router is unable to notify the app about a new first message. When another message is enqueued, a notification is not required, so the queue grows without being read by the app. As a result, request processing stops. This patch changes the notification algorithm by counting the notifications in the pipe instead of getting the number of messages in the queue. --- src/nxt_app_queue.h | 18 ++++++++++++------ src/nxt_unit.c | 7 ++++++- 2 files changed, 18 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h index 127cb8f3..a1cc2f11 100644 --- a/src/nxt_app_queue.h +++ b/src/nxt_app_queue.h @@ -23,7 +23,7 @@ typedef struct { typedef struct { - nxt_app_nncq_atomic_t nitems; + nxt_app_nncq_atomic_t notified; nxt_app_nncq_t free_items; nxt_app_nncq_t queue; nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE]; @@ -42,7 +42,7 @@ nxt_app_queue_init(nxt_app_queue_t volatile *q) nxt_app_nncq_enqueue(&q->free_items, i); } - q->nitems = 0; + q->notified = 0; } @@ -50,6 +50,7 @@ nxt_inline nxt_int_t nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p, uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie) { + int n; nxt_app_queue_item_t *qi; nxt_app_nncq_atomic_t i; @@ -67,16 +68,23 @@ nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p, nxt_app_nncq_enqueue(&q->queue, i); - i = nxt_atomic_fetch_add(&q->nitems, 1); + n = nxt_atomic_cmp_set(&q->notified, 0, 1); if (notify != NULL) { - *notify = (i == 0); + *notify = n; } return NXT_OK; } +nxt_inline void +nxt_app_queue_notification_received(nxt_app_queue_t volatile *q) +{ + q->notified = 0; +} + + nxt_inline nxt_bool_t nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie, uint32_t tracking) @@ -110,8 +118,6 @@ nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie) nxt_app_nncq_enqueue(&q->free_items, i); - nxt_atomic_fetch_add(&q->nitems, -1); - return res; } diff --git a/src/nxt_unit.c b/src/nxt_unit.c index b6904ce9..2cdc75f8 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -6092,7 +6092,10 @@ static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { - int res; + int res; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); retry: @@ -6105,6 +6108,8 @@ retry: } if (nxt_unit_is_read_queue(rbuf)) { + nxt_app_queue_notification_received(port_impl->queue); + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", (int) port->id.pid, (int) port->id.id, (int) rbuf->size); -- cgit From 7b669ed866896afbf26ab6bc0737fe7c8f9c2ec5 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 18 Dec 2020 00:25:28 +0300 Subject: Libunit: fixed shared memory waiting. The nxt_unit_ctx_port_recv() function may return the NXT_UNIT_AGAIN code, in which case an attempt to reread the message should be made. The issue was reproduced in load testing with response sizes 16k and up. In the rare case of a NXT_UNIT_AGAIN result, a buffer of size -1 was processed, which triggered a 'message too small' alert; after that, the app process was terminated. --- src/nxt_unit.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 2cdc75f8..39e7f076 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -3606,7 +3606,10 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + do { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + } while (res == NXT_UNIT_AGAIN); + if (res == NXT_UNIT_ERROR) { nxt_unit_read_buf_release(ctx, rbuf); -- cgit From cac762ab7ef22798d0f1d0813201c0018bd589a1 Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Tue, 22 Dec 2020 17:53:41 +0300 Subject: Python: multiple values in the "path" option. --- src/nxt_application.h | 14 +++---- src/nxt_conf_validation.c | 35 +++++++++++++++- src/nxt_main_process.c | 2 +- src/python/nxt_python.c | 103 ++++++++++++++++++++++++++++++++++------------ 4 files changed, 118 insertions(+), 36 deletions(-) (limited to 'src') diff --git a/src/nxt_application.h b/src/nxt_application.h index 5632f56f..632c5632 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -47,13 +47,13 @@ typedef struct { typedef struct { - char *home; - nxt_str_t path; - nxt_str_t module; - char *callable; - nxt_str_t protocol; - uint32_t threads; - uint32_t thread_stack_size; + char *home; + nxt_conf_value_t *path; + nxt_str_t module; + char *callable; + nxt_str_t protocol; + uint32_t threads; + uint32_t thread_stack_size; } nxt_python_app_conf_t; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index acb2e3de..67fa3095 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -96,6 +96,10 @@ static nxt_int_t nxt_conf_vldt_return(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_python_path(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_python_path_element(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_threads(nxt_conf_validation_t *vldt, @@ -491,7 +495,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = { .type = NXT_CONF_VLDT_STRING, }, { .name = nxt_string("path"), - .type = NXT_CONF_VLDT_STRING, + .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + .validator = nxt_conf_vldt_python_path, }, { .name = nxt_string("module"), .type = NXT_CONF_VLDT_STRING, @@ -1376,6 +1381,34 @@ nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, } +static nxt_int_t +nxt_conf_vldt_python_path(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) +{ + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { + return nxt_conf_vldt_array_iterator(vldt, value, + &nxt_conf_vldt_python_path_element); + } + + /* NXT_CONF_STRING */ + + return NXT_OK; +} + + +static nxt_int_t +nxt_conf_vldt_python_path_element(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value) +{ + if (nxt_conf_type(value) != NXT_CONF_STRING) { + return nxt_conf_vldt_error(vldt, "The \"path\" array must contain " + "only string values."); + } + + return NXT_OK; +} + + static nxt_int_t nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 0cde435b..2916f0ab 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -182,7 +182,7 @@ static nxt_conf_map_t nxt_python_app_conf[] = { { nxt_string("path"), - NXT_CONF_MAP_STR, + NXT_CONF_MAP_PTR, offsetof(nxt_common_app_conf_t, u.python.path), }, diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c index faf0c0e1..d8204937 100644 --- a/src/python/nxt_python.c +++ b/src/python/nxt_python.c @@ -24,6 +24,7 @@ typedef struct { static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data); +static nxt_int_t nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value); static int nxt_python_init_threads(nxt_python_app_conf_t *c); static int nxt_python_ready_handler(nxt_unit_ctx_t *ctx); static void *nxt_python_thread_func(void *main_ctx); @@ -67,7 +68,7 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) int rc; char *nxt_py_module; size_t len; - PyObject *obj, *pypath, *module; + PyObject *obj, *module; nxt_str_t proto; const char *callable; nxt_unit_ctx_t *unit_ctx; @@ -162,38 +163,18 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) } nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush"); + + /* obj is a Borrowed reference. */ + obj = NULL; + if (nxt_slow_path(nxt_py_stderr_flush == NULL)) { nxt_alert(task, "Python failed to get \"flush\" attribute of " "\"sys.stderr\" object"); goto fail; } - /* obj is a Borrowed reference. */ - - if (c->path.length > 0) { - obj = PyString_FromStringAndSize((char *) c->path.start, - c->path.length); - - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to create string object \"%V\"", - &c->path); - goto fail; - } - - pypath = PySys_GetObject((char *) "path"); - - if (nxt_slow_path(pypath == NULL)) { - nxt_alert(task, "Python failed to get \"sys.path\" list"); - goto fail; - } - - if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { - nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"", - &c->path); - goto fail; - } - - Py_DECREF(obj); + if (nxt_slow_path(nxt_python_set_path(task, c->path) != NXT_OK)) { + goto fail; } obj = Py_BuildValue("[s]", "unit"); @@ -317,6 +298,74 @@ fail: } +static nxt_int_t +nxt_python_set_path(nxt_task_t *task, nxt_conf_value_t *value) +{ + int ret; + PyObject *path, *sys; + nxt_str_t str; + nxt_uint_t n; + nxt_conf_value_t *array; + + if (value == NULL) { + return NXT_OK; + } + + sys = PySys_GetObject((char *) "path"); + if (nxt_slow_path(sys == NULL)) { + nxt_alert(task, "Python failed to get \"sys.path\" list"); + return NXT_ERROR; + } + + /* sys is a Borrowed reference. */ + + if (nxt_conf_type(value) == NXT_CONF_STRING) { + n = 0; + goto value_is_string; + } + + /* NXT_CONF_ARRAY */ + array = value; + + n = nxt_conf_array_elements_count(array); + + while (n != 0) { + n--; + + /* + * Insertion in front of existing paths starting from the last element + * to preserve original order while giving priority to the values + * specified in the "path" option. + */ + + value = nxt_conf_get_array_element(array, n); + + value_is_string: + + nxt_conf_get_string(value, &str); + + path = PyString_FromStringAndSize((char *) str.start, str.length); + if (nxt_slow_path(path == NULL)) { + nxt_alert(task, "Python failed to create string object \"%V\"", + &str); + return NXT_ERROR; + } + + ret = PyList_Insert(sys, 0, path); + + Py_DECREF(path); + + if (nxt_slow_path(ret != 0)) { + nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"", + &str); + return NXT_ERROR; + } + } + + return NXT_OK; +} + + static int nxt_python_init_threads(nxt_python_app_conf_t *c) { -- cgit From b7dba9006243f65e66d85a3a29841262c1c5dfef Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 23 Dec 2020 11:01:36 +0300 Subject: Static: fixing request memory pool leakage in router. When a static file larger than NXT_HTTP_STATIC_BUF_SIZE (128K) is served, two buffers are allocated and chained; each retains the whole request memory pool. Starting from 41331471eee7, the completion handler was called once for a linked buffer chain, but the second buffer got lost. This patch improves the completion handler's treatment of static buffers to handle all linked buffers. --- src/nxt_http_static.c | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index 5687ef2c..df2655fc 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -395,12 +395,15 @@ static void nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data) { ssize_t n, size; - nxt_buf_t *b, *fb; + nxt_buf_t *b, *fb, *next; nxt_off_t rest; nxt_http_request_t *r; b = obj; r = data; + +complete_buf: + fb = r->out; if (nxt_slow_path(fb == NULL || r->error)) { @@ -424,6 +427,8 @@ nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data) goto clean; } + next = b->next; + if (n == rest) { nxt_file_close(task, fb->file); r->out = NULL; @@ -439,12 +444,24 @@ nxt_http_static_buf_completion(nxt_task_t *task, void *obj, void *data) b->mem.free = b->mem.pos + n; nxt_http_request_send(task, r, b); + + if (next != NULL) { + b = next; + goto complete_buf; + } + return; clean: - nxt_mp_free(r->mem_pool, b); - nxt_mp_release(r->mem_pool); + do { + next = b->next; + + nxt_mp_free(r->mem_pool, b); + nxt_mp_release(r->mem_pool); + + b = next; + } while (b != NULL); if (fb != NULL) { nxt_file_close(task, fb->file); -- cgit From d3d6864bdc64f34924e686ff65da704b29aaaa93 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 29 Dec 2020 19:00:54 +0300 Subject: Node.js: ServerRequest and ServerResponse compliance to Stream API. ServerRequest now inherit stream Readable object. ServerResponse provides 'writable' property. Thanks to Wu Jian Ping (@wujjpp). This closes #274, closes #317 issues and closes #502 PR on GitHub. --- src/nodejs/unit-http/http_server.js | 50 +++++++++---------------- src/nodejs/unit-http/unit.cpp | 75 +++++++++++++++++++++++++++++++------ src/nodejs/unit-http/unit.h | 6 ++- 3 files changed, 87 insertions(+), 44 deletions(-) (limited to 'src') diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index d378e410..e59296ae 100644 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -11,6 +11,7 @@ const util = require('util'); const unit_lib = require('./build/Release/unit-http'); const Socket = require('./socket'); const WebSocketFrame = require('./websocket_frame'); +const Readable = require('stream').Readable; function ServerResponse(req) { @@ -23,6 +24,7 @@ function ServerResponse(req) { req._response = this; this.socket = req.socket; this.connection = req.connection; + this.writable = true; } util.inherits(ServerResponse, EventEmitter); @@ -268,6 +270,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { res = this._write(chunk, 0, contentLength); if (res < contentLength) { this.socket.writable = false; + this.writable = false; o = new BufferedOutput(this, res, chunk, encoding, callback); this.server._output.push(o); @@ -328,6 +331,8 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) { if (typeof callback === 'function') { callback(); } + + this.emit("finish"); }); this.finished = true; @@ -337,15 +342,14 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) { }; function ServerRequest(server, socket) { - EventEmitter.call(this); + Readable.call(this); this.server = server; this.socket = socket; this.connection = socket; + this._pushed_eofchunk = false; } -util.inherits(ServerRequest, EventEmitter); - -ServerRequest.prototype.unpipe = undefined; +util.inherits(ServerRequest, Readable); ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) { this.timeout = msecs; @@ -377,35 +381,21 @@ ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() { return http.STATUS_CODES; }; -ServerRequest.prototype.listeners = function listeners() { - return []; -}; - -ServerRequest.prototype.resume = function resume() { - return []; -}; +ServerRequest.prototype._request_read = unit_lib.request_read; -/* - * The "on" method is overridden to defer reading data until user code is - * ready, that is (ev === "data"). This can occur after req.emit("end") is - * executed, since the user code can be scheduled asynchronously by Promises - * and so on. Passing the data is postponed by process.nextTick() until - * the "on" method caller completes. - */ -ServerRequest.prototype.on = function on(ev, fn) { - Server.prototype.on.call(this, ev, fn); +ServerRequest.prototype._read = function _read(n) { + const b = this._request_read(n); - if (ev === "data") { - process.nextTick(function () { - if (this._data.length !== 0) { - this.emit("data", this._data); - } + if (b != null) { + this.push(b); + } - }.bind(this)); + if (!this._pushed_eofchunk && (b == null || b.length < n)) { + this._pushed_eofchunk = true; + this.push(null); } }; -ServerRequest.prototype.addListener = ServerRequest.prototype.on; function Server(requestListener) { EventEmitter.call(this); @@ -472,11 +462,6 @@ Server.prototype.emit_request = function (req, res) { } else { this.emit("request", req, res); } - - process.nextTick(() => { - req.emit("finish"); - req.emit("end"); - }); }; Server.prototype.emit_close = function () { @@ -523,6 +508,7 @@ Server.prototype.emit_drain = function () { } resp.socket.writable = true; + resp.writable = true; process.nextTick(() => { resp.emit("drain"); diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 0e049865..67b4377d 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -27,6 +27,7 @@ struct port_data_t { struct req_data_t { napi_ref sock_ref; + napi_ref req_ref; napi_ref resp_ref; napi_ref conn_ref; }; @@ -65,6 +66,7 @@ Unit::init(napi_env env, napi_value exports) constructor_ = napi.create_reference(ctor); napi.set_named_property(exports, "Unit", ctor); + napi.set_named_property(exports, "request_read", request_read); napi.set_named_property(exports, "response_send_headers", response_send_headers); napi.set_named_property(exports, "response_write", response_write); @@ -206,7 +208,7 @@ Unit::request_handler(nxt_unit_request_info_t *req) server_obj = get_server_object(); socket = create_socket(server_obj, req); - request = create_request(server_obj, socket); + request = create_request(server_obj, socket, req); response = create_response(server_obj, request, req); create_headers(req, request); @@ -301,6 +303,7 @@ Unit::close_handler(nxt_unit_request_info_t *req) nxt_napi::create(0)); remove_wrap(req_data->sock_ref); + remove_wrap(req_data->req_ref); remove_wrap(req_data->resp_ref); remove_wrap(req_data->conn_ref); @@ -488,9 +491,8 @@ Unit::get_server_object() void Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) { - void *data; uint32_t i; - napi_value headers, raw_headers, buffer; + napi_value headers, raw_headers; napi_status status; nxt_unit_request_t *r; @@ -515,11 +517,6 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) set_named_property(request, "url", r->target, r->target_length); set_named_property(request, "_websocket_handshake", r->websocket_handshake); - - buffer = create_buffer((size_t) req->content_length, &data); - nxt_unit_request_read(req, data, req->content_length); - - set_named_property(request, "_data", buffer); } @@ -577,13 +574,20 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) napi_value -Unit::create_request(napi_value server_obj, napi_value socket) +Unit::create_request(napi_value server_obj, napi_value socket, + nxt_unit_request_info_t *req) { - napi_value constructor; + napi_value constructor, res; + req_data_t *req_data; constructor = get_named_property(server_obj, "ServerRequest"); - return new_instance(constructor, server_obj, socket); + res = new_instance(constructor, server_obj, socket); + + req_data = (req_data_t *) req->data; + req_data->req_ref = wrap(res, req, req_destroy); + + return res; } @@ -642,6 +646,47 @@ Unit::create_websocket_frame(napi_value server_obj, } +napi_value +Unit::request_read(napi_env env, napi_callback_info info) +{ + void *data; + uint32_t wm; + nxt_napi napi(env); + napi_value this_arg, argv, buffer; + nxt_unit_request_info_t *req; + + try { + this_arg = napi.get_cb_info(info, argv); + + try { + req = napi.get_request_info(this_arg); + + } catch (exception &e) { + return nullptr; + } + + if (req->content_length == 0) { + return nullptr; + } + + wm = napi.get_value_uint32(argv); + + if (wm > req->content_length) { + wm = req->content_length; + } + + buffer = napi.create_buffer((size_t) wm, &data); + nxt_unit_request_read(req, data, wm); + + } catch (exception &e) { + napi.throw_error(e); + return nullptr; + } + + return buffer; +} + + napi_value Unit::response_send_headers(napi_env env, napi_callback_info info) { @@ -884,6 +929,7 @@ Unit::response_end(napi_env env, napi_callback_info info) req_data = (req_data_t *) req->data; napi.remove_wrap(req_data->sock_ref); + napi.remove_wrap(req_data->req_ref); napi.remove_wrap(req_data->resp_ref); napi.remove_wrap(req_data->conn_ref); @@ -1011,6 +1057,13 @@ Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) } +void +Unit::req_destroy(napi_env env, void *r, void *finalize_hint) +{ + nxt_unit_req_debug(NULL, "req_destroy: %p", r); +} + + void Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) { diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 07823c26..4ef40d45 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -21,6 +21,7 @@ private: static void destroy(napi_env env, void *nativeObject, void *finalize_hint); static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint); static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint); + static void req_destroy(napi_env env, void *nativeObject, void *finalize_hint); static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint); static napi_value create_server(napi_env env, napi_callback_info info); @@ -50,7 +51,8 @@ private: napi_value create_socket(napi_value server_obj, nxt_unit_request_info_t *req); - napi_value create_request(napi_value server_obj, napi_value socket); + napi_value create_request(napi_value server_obj, napi_value socket, + nxt_unit_request_info_t *req); napi_value create_response(napi_value server_obj, napi_value request, nxt_unit_request_info_t *req); @@ -58,6 +60,8 @@ private: napi_value create_websocket_frame(napi_value server_obj, nxt_unit_websocket_frame_t *ws); + static napi_value request_read(napi_env env, napi_callback_info info); + static napi_value response_send_headers(napi_env env, napi_callback_info info); -- cgit From d65a66f9d813294917822554311281c5e1a7126b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 29 Dec 2020 19:01:24 +0300 Subject: Libunit: processing single port message. This partially reverts the optimisation introduced in 1d84b9e4b459 to avoid an unpredictable block in nxt_unit_process_port_msg(). Under high load, this function may never return control to its caller, and the external event loop (in Node.js and Python asyncio) won't be able to process other scheduled events. To reproduce the issue, two request processing types are needed: 'fast' and 'furious'. The 'fast' one simply returns a small response, while the 'furious' schedules asynchronous calls to external resources. Thus, if Unit is subjected to a large amount of 'fast' requests, the 'furious' request processing freezes until the high load ends. The issue was found by Wu Jian Ping (@wujjpp) during Node.js stream implementation discussion and relates to PR #502 on GitHub. --- src/nodejs/unit-http/unit.cpp | 176 +++++++++++++++++++++++++++++++----------- src/nxt_unit.c | 13 ---- src/python/nxt_python_asgi.c | 41 ++++++---- 3 files changed, 161 insertions(+), 69 deletions(-) (limited to 'src') diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 67b4377d..589eca3f 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -13,15 +13,29 @@ #include -static void delete_port_data(uv_handle_t* handle); - napi_ref Unit::constructor_; struct port_data_t { - nxt_unit_ctx_t *ctx; - nxt_unit_port_t *port; - uv_poll_t poll; + port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p); + + void process_port_msg(); + void stop(); + + template + static port_data_t *get(T *handle); + + static void read_callback(uv_poll_t *handle, int status, int events); + static void timer_callback(uv_timer_t *handle); + static void delete_data(uv_handle_t* handle); + + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + uv_poll_t poll; + uv_timer_t timer; + int ref_count; + bool scheduled; + bool stopped; }; @@ -33,6 +47,106 @@ struct req_data_t { }; +port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) : + ctx(c), port(p), ref_count(0), scheduled(false), stopped(false) +{ + timer.type = UV_UNKNOWN_HANDLE; +} + + +void +port_data_t::process_port_msg() +{ + int rc, err; + + rc = nxt_unit_process_port_msg(ctx, port); + + if (rc != NXT_UNIT_OK) { + return; + } + + if (timer.type == UV_UNKNOWN_HANDLE) { + err = uv_timer_init(poll.loop, &timer); + if (err < 0) { + nxt_unit_warn(ctx, "Failed to init uv.poll"); + return; + } + + ref_count++; + timer.data = this; + } + + if (!scheduled && !stopped) { + uv_timer_start(&timer, timer_callback, 0, 0); + + scheduled = true; + } +} + + +void +port_data_t::stop() +{ + stopped = true; + + uv_poll_stop(&poll); + + uv_close((uv_handle_t *) &poll, delete_data); + + if (timer.type == UV_UNKNOWN_HANDLE) { + return; + } + + uv_timer_stop(&timer); + + uv_close((uv_handle_t *) &timer, delete_data); +} + + +template +port_data_t * +port_data_t::get(T *handle) +{ + return (port_data_t *) handle->data; +} + + +void +port_data_t::read_callback(uv_poll_t *handle, int status, int events) +{ + get(handle)->process_port_msg(); +} + + +void +port_data_t::timer_callback(uv_timer_t *handle) +{ + port_data_t *data; + + data = get(handle); + + data->scheduled = false; + if (data->stopped) { + return; + } + + data->process_port_msg(); +} + + +void +port_data_t::delete_data(uv_handle_t* handle) +{ + port_data_t *data; + + data = get(handle); + + if (--data->ref_count <= 0) { + delete data; + } +} + + Unit::Unit(napi_env env, napi_value jsthis): nxt_napi(env), wrapper_(wrap(jsthis, this, destroy)), @@ -353,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) } -static void -nxt_uv_read_callback(uv_poll_t *handle, int status, int events) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - nxt_unit_process_port_msg(data->ctx, data->port); -} - - int Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int err; - Unit *obj; - uv_loop_t *loop; - port_data_t *data; - napi_status status; + int err; + Unit *obj; + uv_loop_t *loop; + port_data_t *data; + napi_status status; if (port->in_fd != -1) { - obj = reinterpret_cast(ctx->unit->data); - if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", port->in_fd, strerror(errno), errno); return -1; } + obj = reinterpret_cast(ctx->unit->data); + status = napi_get_uv_event_loop(obj->env(), &loop); if (status != napi_ok) { nxt_unit_warn(ctx, "Failed to get uv.loop"); return NXT_UNIT_ERROR; } - data = new port_data_t; + data = new port_data_t(ctx, port); err = uv_poll_init(loop, &data->poll, port->in_fd); if (err < 0) { nxt_unit_warn(ctx, "Failed to init uv.poll"); + delete data; return NXT_UNIT_ERROR; } - err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); + err = uv_poll_start(&data->poll, UV_READABLE, + port_data_t::read_callback); if (err < 0) { nxt_unit_warn(ctx, "Failed to start uv.poll"); + delete data; return NXT_UNIT_ERROR; } port->data = data; - data->ctx = ctx; - data->port = port; + data->ref_count++; data->poll.data = data; } @@ -421,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) if (port->data != NULL) { data = (port_data_t *) port->data; - if (data->port == port) { - uv_poll_stop(&data->poll); - - uv_close((uv_handle_t *) &data->poll, delete_port_data); - } + data->stop(); } } -static void -delete_port_data(uv_handle_t* handle) -{ - port_data_t *data; - - data = (port_data_t *) handle->data; - - delete data; -} - - void Unit::quit_cb(nxt_unit_ctx_t *ctx) { diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 39e7f076..2fef17c5 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -5016,7 +5016,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { @@ -5024,9 +5023,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - -retry: if (port == lib->shared_port) { rc = nxt_unit_shared_port_recv(ctx, port, rbuf); @@ -5052,15 +5048,6 @@ retry: nxt_unit_process_ready_req(ctx); - if (ctx_impl->online) { - rbuf = nxt_unit_read_buf_get(ctx); - if (nxt_slow_path(rbuf == NULL)) { - return NXT_UNIT_ERROR; - } - - goto retry; - } - return rc; } diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c index 98aeedf4..a6f94507 100644 --- a/src/python/nxt_python_asgi.c +++ b/src/python/nxt_python_asgi.c @@ -1131,11 +1131,12 @@ nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) static PyObject * nxt_py_asgi_port_read(PyObject *self, PyObject *args) { - int rc; - PyObject *arg; - Py_ssize_t n; - nxt_unit_ctx_t *ctx; - nxt_unit_port_t *port; + int rc; + PyObject *arg0, *arg1, *res; + Py_ssize_t n; + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + nxt_py_asgi_ctx_data_t *ctx_data; n = PyTuple_GET_SIZE(args); @@ -1147,31 +1148,45 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args) return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); } - arg = PyTuple_GET_ITEM(args, 0); - if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + arg0 = PyTuple_GET_ITEM(args, 0); + if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) { return PyErr_Format(PyExc_TypeError, "the first argument is not a long"); } - ctx = PyLong_AsVoidPtr(arg); + ctx = PyLong_AsVoidPtr(arg0); - arg = PyTuple_GET_ITEM(args, 1); - if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + arg1 = PyTuple_GET_ITEM(args, 1); + if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) { return PyErr_Format(PyExc_TypeError, "the second argument is not a long"); } - port = PyLong_AsVoidPtr(arg); - - nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); + port = PyLong_AsVoidPtr(arg1); rc = nxt_unit_process_port_msg(ctx, port); + nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return PyErr_Format(PyExc_RuntimeError, "error processing port %d message", port->id.id); } + if (rc == NXT_UNIT_OK) { + ctx_data = ctx->data; + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, + nxt_py_port_read, + arg0, arg1, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + } + Py_RETURN_NONE; } -- cgit From 9b76505bf7fe7954ebd66e23fc29dab2e6b02e47 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 25 Jan 2021 13:13:17 +0300 Subject: Router: fixing assertion in shortage of file descriptors. Each application in router process required fd for a request queue shared memory. When the number of file descripts close to the limit, and port sockets successfully opened, router needs to properly handle the errors. This patch closes port sockets before destroying port structure to avoid file descriptors leakage and assertion in debug build. --- src/nxt_router.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src') diff --git a/src/nxt_router.c b/src/nxt_router.c index 0416dea0..8d6e493d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1586,6 +1586,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, ret = nxt_router_app_queue_init(task, port); if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_write_close(port); + nxt_port_read_close(port); nxt_port_use(task, port, -1); return NXT_ERROR; } -- cgit From 3855f1c032cb52f4e8f370f639d259b7cc313939 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 27 Jan 2021 17:32:03 +0300 Subject: Router: fixing error handling in config request. The controller process awaits the response from the router for every configration change request. This patch adds error reporting for various error conditions which may happen because of file descriptors or memory shortage. Lack of a response lead to the controller awaiting the response, thus being unable to process other client reconfiguration requests that also became stuck. --- src/nxt_router.c | 61 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/nxt_router.c b/src/nxt_router.c index 8d6e493d..d9c722dd 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -699,26 +699,39 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void *p; size_t size; nxt_int_t ret; + nxt_port_t *port; nxt_router_temp_conf_t *tmcf; + port = nxt_runtime_port_find(task->thread->runtime, + msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "conf_data_handler: reply port not found"); + return; + } + + p = MAP_FAILED; + + /* + * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be + * initialized in 'cleanup' section. + */ + size = 0; + tmcf = nxt_router_temp_conf(task); if (nxt_slow_path(tmcf == NULL)) { - return; + goto fail; } if (nxt_slow_path(msg->fd[0] == -1)) { - nxt_alert(task, "conf_data_handler: invalid file shm fd"); - return; + nxt_alert(task, "conf_data_handler: invalid shm fd"); + goto fail; } if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", (int) nxt_buf_mem_used_size(&msg->buf->mem)); - - nxt_fd_close(msg->fd[0]); - msg->fd[0] = -1; - - return; + goto fail; } nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); @@ -729,22 +742,14 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->fd[0] = -1; if (nxt_slow_path(p == MAP_FAILED)) { - return; + goto fail; } nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); tmcf->router_conf->router = nxt_router; tmcf->stream = msg->port_msg.stream; - tmcf->port = nxt_runtime_port_find(task->thread->runtime, - msg->port_msg.pid, - msg->port_msg.reply_port); - - if (nxt_slow_path(tmcf->port == NULL)) { - nxt_alert(task, "reply port not found"); - - goto fail; - } + tmcf->port = port; nxt_port_use(task, tmcf->port, 1); @@ -757,9 +762,27 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_router_conf_error(task, tmcf); } + goto cleanup; + fail: - nxt_mem_munmap(p, size); + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, + msg->port_msg.stream, 0, NULL); + + if (tmcf != NULL) { + nxt_mp_destroy(tmcf->mem_pool); + } + +cleanup: + + if (p != MAP_FAILED) { + nxt_mem_munmap(p, size); + } + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; + } } -- cgit From e4e444b82701de0c984a72eb9c2657f72d7171ae Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 28 Jan 2021 17:13:52 +0300 Subject: Router: fixing crash after WebSocket processing. After WebSocket processing, the application port was released with incorrect reason ("got request"), unnecessarily decrementing the active request counter. The assertion was triggered only on application removal; a test was added for this case. --- src/nxt_router.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/nxt_router.c b/src/nxt_router.c index d9c722dd..03fe2a6c 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -3796,7 +3796,10 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, nxt_http_buf_last(r)); req_rpc_data->rpc_cancel = 0; - req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + + if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) { + req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + } nxt_request_rpc_data_unlink(task, req_rpc_data); -- cgit From 46a8c98a1aa6e0c4b86a72b8f8fe47878012afb1 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 28 Jan 2021 18:32:12 +0300 Subject: Removing unused mutex from nxt_process_t. --- src/nxt_process.h | 2 -- src/nxt_runtime.c | 2 -- 2 files changed, 4 deletions(-) (limited to 'src') diff --git a/src/nxt_process.h b/src/nxt_process.h index 7afb8803..4f24b179 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -107,8 +107,6 @@ struct nxt_process_s { nxt_port_mmaps_t incoming; - nxt_thread_mutex_t cp_mutex; - uint32_t stream; nxt_mp_t *mem_pool; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index d9d986da..8a86d38a 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1387,7 +1387,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_queue_init(&process->ports); nxt_thread_mutex_create(&process->incoming.mutex); - nxt_thread_mutex_create(&process->cp_mutex); process->use_count = 1; @@ -1408,7 +1407,6 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_mmaps_destroy(&process->incoming, 1); nxt_thread_mutex_destroy(&process->incoming.mutex); - nxt_thread_mutex_destroy(&process->cp_mutex); /* processes from nxt_runtime_process_get() have no memory pool */ if (process->mem_pool != NULL) { -- cgit From 93ac087e9684c63f82df36f847bf9239e2eb185e Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Mon, 1 Feb 2021 18:55:49 +0300 Subject: Fixed building by GCC 10 with -flto and -O2. This closes #467 issue on GitHub. --- src/nxt_http_route.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src') diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index 9aaa708e..28545fc9 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -813,6 +813,12 @@ nxt_http_route_ruleset_create(nxt_task_t *task, nxt_mp_t *mp, next = 0; + /* + * A workaround for GCC 10 with -flto -O2 flags that warns about "name" + * may be uninitialized in nxt_http_route_rule_name_create(). + */ + nxt_str_null(&name); + for (i = 0; i < n; i++) { rule_cv = nxt_conf_next_object_member(ruleset_cv, &name, &next); -- cgit From 8c88537e6ee0c0a2ae1c323b8cce09522240471b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 3 Feb 2021 23:23:06 +0300 Subject: Using shared memory to pass configuration to main process. This patch is required to remove fragmented messages functionality. --- src/nxt_controller.c | 37 +++++++++++++++++++++++---- src/nxt_main_process.c | 69 +++++++++++++++++++++++++++++++++++++------------- 2 files changed, 84 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 9a34a877..772d10c8 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -1686,7 +1686,10 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, static void nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) { + void *mem; + u_char *end; size_t size; + nxt_fd_t fd; nxt_buf_t *b; nxt_port_t *main_port; nxt_runtime_t *rt; @@ -1697,14 +1700,38 @@ nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) size = nxt_conf_json_length(conf, NULL); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); + fd = nxt_shm_open(task, size); + if (nxt_slow_path(fd == -1)) { + return; + } + + mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + goto fail; + } + + end = nxt_conf_json_print(mem, conf, NULL); + + nxt_mem_munmap(mem, size); - if (nxt_fast_path(b != NULL)) { - b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); + size = end - (u_char *) mem; - (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE, - -1, 0, -1, b); + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, sizeof(size_t), 0); + if (nxt_slow_path(b == NULL)) { + goto fail; } + + b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t)); + + (void) nxt_port_socket_write(task, main_port, + NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_CLOSE_FD, + fd, 0, -1, b); + + return; + +fail: + + nxt_fd_close(fd); } diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 2916f0ab..9a78f9da 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -1408,12 +1408,45 @@ nxt_app_lang_compare(const void *v1, const void *v2) static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - ssize_t n, size, offset; - nxt_buf_t *b; + void *p; + size_t size; + ssize_t n; nxt_int_t ret; nxt_file_t file; nxt_runtime_t *rt; + p = MAP_FAILED; + + /* + * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be + * initialized in 'cleanup' section. + */ + size = 0; + + if (nxt_slow_path(msg->fd[0] == -1)) { + nxt_alert(task, "conf_store_handler: invalid shm fd"); + goto error; + } + + if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { + nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)", + (int) nxt_buf_mem_used_size(&msg->buf->mem)); + goto error; + } + + nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); + + p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); + + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; + + if (nxt_slow_path(p == MAP_FAILED)) { + goto error; + } + + nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); + nxt_memzero(&file, sizeof(nxt_file_t)); rt = task->thread->runtime; @@ -1427,33 +1460,35 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto error; } - offset = 0; - - for (b = msg->buf; b != NULL; b = b->next) { - size = nxt_buf_mem_used_size(&b->mem); - - n = nxt_file_write(&file, b->mem.pos, size, offset); + n = nxt_file_write(&file, p, size, 0); - if (nxt_slow_path(n != size)) { - nxt_file_close(task, &file); - (void) nxt_file_delete(file.name); - goto error; - } + nxt_file_close(task, &file); - offset += n; + if (nxt_slow_path(n != (ssize_t) size)) { + (void) nxt_file_delete(file.name); + goto error; } - nxt_file_close(task, &file); - ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); if (nxt_fast_path(ret == NXT_OK)) { - return; + goto cleanup; } error: nxt_alert(task, "failed to store current configuration"); + +cleanup: + + if (p != MAP_FAILED) { + nxt_mem_munmap(p, size); + } + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; + } } -- cgit From b1685dbc769a1b62eedc3249697c30d8770fc8c3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 3 Feb 2021 23:23:17 +0300 Subject: Fixing possible NULL dereference. For listen socket request reply port can be NULL if Router crashes immediately after issuing the request. Found by Coverity (CID 366310). --- src/nxt_main_process.c | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 9a78f9da..f20f2c2c 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -1001,21 +1001,22 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_listening_socket_t ls; u_char message[2048]; + port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + return; + } + b = msg->buf; sa = (nxt_sockaddr_t *) b->mem.pos; /* TODO check b size and make plain */ - out = NULL; - ls.socket = -1; ls.error = NXT_SOCKET_ERROR_SYSTEM; ls.start = message; ls.end = message + sizeof(message); - port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, - msg->port_msg.reply_port); - nxt_debug(task, "listening socket \"%*s\"", (size_t) sa->length, nxt_sockaddr_start(sa)); @@ -1025,6 +1026,8 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "socket(\"%*s\"): %d", (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket); + out = NULL; + type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD; } else { @@ -1034,13 +1037,11 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size + 1); - if (nxt_slow_path(out == NULL)) { - return; - } - - *out->mem.free++ = (uint8_t) ls.error; + if (nxt_fast_path(out != NULL)) { + *out->mem.free++ = (uint8_t) ls.error; - out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); + out->mem.free = nxt_cpymem(out->mem.free, ls.start, size); + } type = NXT_PORT_MSG_RPC_ERROR; } -- cgit From 75a5dcfc4ec4f92a196c0cf3a187081a238a6b1d Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 3 Feb 2021 23:23:28 +0300 Subject: Fixing shared app queue unmap size. Shared app queue takes more memory than port memory. To unmap all memory pages correct size need to be specified for munmap() call. Otherwise 4 Mb memory leaked on each configured application removal. The issue was introduced in 1d84b9e4b459. --- src/nxt_port.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/nxt_port.c b/src/nxt_port.c index dbcdec11..d4e46564 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -84,6 +85,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, void nxt_port_close(nxt_task_t *task, nxt_port_t *port) { + size_t size; + nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, port->id, port->type); @@ -109,7 +112,10 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port) } if (port->queue != NULL) { - nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t)); + size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t) + : sizeof(nxt_port_queue_t); + nxt_mem_munmap(port->queue, size); + port->queue = NULL; } } -- cgit