From 9bacd21405de021fa846bac95b7e3fb796763a80 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Sep 2019 18:30:58 +0300 Subject: Protecting context structures with mutex. By design, Unit context is created for the thread which reads messages from the router. However, Go request handlers are called in a separate goroutine that may be executed in a different thread. To avoid a racing condition, access to lists of free structures in the context should be serialized. This patch should fix random crashes in Go applications under high load. This is related to #253 and #309 issues on GitHub. --- src/nxt_unit.c | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 4 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 28a0de20..3f6a945f 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -31,7 +31,7 @@ typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); -static void nxt_unit_ctx_init(nxt_unit_impl_t *lib, +static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); @@ -204,6 +204,8 @@ struct nxt_unit_websocket_frame_impl_s { struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; + pthread_mutex_t mutex; + nxt_unit_port_id_t read_port_id; int read_port_fd; @@ -402,7 +404,10 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); - nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); + rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } cb = &lib->callbacks; @@ -446,15 +451,24 @@ fail: } -static void +static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data) { + int rc; + ctx_impl->ctx.data = data; ctx_impl->ctx.unit = &lib->unit; nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + rc = pthread_mutex_init(&ctx_impl->mutex, NULL); + if (nxt_slow_path(rc != 0)) { + nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); + + return NXT_UNIT_ERROR; + } + nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); @@ -470,6 +484,8 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->read_port_fd = -1; ctx_impl->requests.slot = 0; + + return NXT_UNIT_OK; } @@ -1029,7 +1045,11 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + pthread_mutex_lock(&ctx_impl->mutex); + if (nxt_queue_is_empty(&ctx_impl->free_req)) { + pthread_mutex_unlock(&ctx_impl->mutex); + req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) + lib->request_data_size); if (nxt_slow_path(req_impl == NULL)) { @@ -1041,6 +1061,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) req_impl->req.unit = ctx->unit; req_impl->req.ctx = ctx; + pthread_mutex_lock(&ctx_impl->mutex); + } else { lnk = nxt_queue_first(&ctx_impl->free_req); nxt_queue_remove(lnk); @@ -1050,6 +1072,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); + pthread_mutex_unlock(&ctx_impl->mutex); + req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; return req_impl; @@ -1088,10 +1112,14 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_unit_mmap_buf_free(req_impl->incoming_buf); } + pthread_mutex_lock(&ctx_impl->mutex); + nxt_queue_remove(&req_impl->link); nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); + pthread_mutex_unlock(&ctx_impl->mutex); + req_impl->state = NXT_UNIT_RS_RELEASED; } @@ -1120,7 +1148,11 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + pthread_mutex_lock(&ctx_impl->mutex); + if (nxt_queue_is_empty(&ctx_impl->free_ws)) { + pthread_mutex_unlock(&ctx_impl->mutex); + ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); if (nxt_slow_path(ws_impl == NULL)) { nxt_unit_warn(ctx, "websocket frame allocation failed"); @@ -1132,6 +1164,8 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) lnk = nxt_queue_first(&ctx_impl->free_ws); nxt_queue_remove(lnk); + pthread_mutex_unlock(&ctx_impl->mutex); + ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); } @@ -1160,7 +1194,11 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) ws_impl->retain_buf = NULL; } + pthread_mutex_lock(&ws_impl->ctx_impl->mutex); + nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); + + pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); } @@ -1688,16 +1726,24 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + pthread_mutex_lock(&ctx_impl->mutex); + if (ctx_impl->free_buf == NULL) { + pthread_mutex_unlock(&ctx_impl->mutex); + mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); if (nxt_slow_path(mmap_buf == NULL)) { nxt_unit_warn(ctx, "failed to allocate buf"); + + return NULL; } } else { mmap_buf = ctx_impl->free_buf; nxt_unit_mmap_buf_remove(mmap_buf); + + pthread_mutex_unlock(&ctx_impl->mutex); } mmap_buf->ctx_impl = ctx_impl; @@ -1711,7 +1757,11 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) { nxt_unit_mmap_buf_remove(mmap_buf); + pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); + nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); + + pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); } @@ -3298,7 +3348,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) close(fd); - nxt_unit_ctx_init(lib, new_ctx, data); + rc = nxt_unit_ctx_init(lib, new_ctx, data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + lib->callbacks.remove_port(ctx, &new_port_id); + + free(new_ctx); + + return NULL; + } new_ctx->read_port_id = new_port_id; @@ -3350,6 +3407,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; + pthread_mutex_destroy(&ctx_impl->mutex); + nxt_queue_remove(&ctx_impl->link); if (ctx_impl != &lib->main_ctx) { -- cgit From 4ea9ed309e958ad087ddcfd358688e2a2f105e39 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Sep 2019 18:31:06 +0300 Subject: Reducing number of warning messages. One alert per failed allocation is enough. --- src/nxt_unit.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 3f6a945f..18a07f9e 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -978,6 +978,9 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } else { b = nxt_unit_mmap_buf_get(ctx); if (nxt_slow_path(b == NULL)) { + nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", + req_impl->stream); + return NXT_UNIT_ERROR; } @@ -1053,8 +1056,6 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) + lib->request_data_size); if (nxt_slow_path(req_impl == NULL)) { - nxt_unit_warn(ctx, "request info allocation failed"); - return NULL; } @@ -1155,8 +1156,6 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); if (nxt_slow_path(ws_impl == NULL)) { - nxt_unit_warn(ctx, "websocket frame allocation failed"); - return NULL; } @@ -1673,6 +1672,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) mmap_buf = nxt_unit_mmap_buf_get(req->ctx); if (nxt_slow_path(mmap_buf == NULL)) { + nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); + return NULL; } @@ -1733,8 +1734,6 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); if (nxt_slow_path(mmap_buf == NULL)) { - nxt_unit_warn(ctx, "failed to allocate buf"); - return NULL; } -- cgit From a216b15e991d6e7cdc7b61fe018d58dd387937bd Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Sep 2019 18:31:22 +0300 Subject: Fixing request release order to avoid crashes on exit. Each request references the router process structure that owns all memory maps. The process structure has a reference counter; each request increases the counter to lock the structure in memory until request processing ends. Incoming and outgoing buffers reference memory maps that the process owns, so the process structure should be released only when all buffers are released to avoid invalid memory access and a crash. This describes the libunit library mechanism used for application processes. The background of this issue is as follows: The issue was found on buildbot when the router crashed during Java websocket tests. The Java application receives a notification from the master process; when the notification is processed, libunit deletes the process structure from its process hash and decrements the use counter; however, active websocket connections maintain their use counts on the process structure. After that, when the master process is stopping the application, libunit releases active websocket connections. At this point, it's important to release the connections' memory buffers before the corresponding process structure and all shared memory segments are released. --- src/nxt_unit.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 18a07f9e..9696e9cd 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -1093,12 +1093,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - if (req_impl->process != NULL) { - nxt_unit_process_use(req->ctx, req_impl->process, -1); - - req_impl->process = NULL; - } - if (req_impl->websocket) { nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); @@ -1113,6 +1107,16 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_unit_mmap_buf_free(req_impl->incoming_buf); } + /* + * Process release should go after buffers release to guarantee mmap + * existence. + */ + if (req_impl->process != NULL) { + nxt_unit_process_use(req->ctx, req_impl->process, -1); + + req_impl->process = NULL; + } + pthread_mutex_lock(&ctx_impl->mutex); nxt_queue_remove(&req_impl->link); -- cgit From 6346e641eef4aacf92e81e0f1ea4f42ed1e62834 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 19 Sep 2019 16:28:03 +0300 Subject: Releasing WebSocket frame in case of buffer allocation failure. Found by Coverity (CID 349456). --- src/nxt_unit.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 9696e9cd..4497d09d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -981,6 +981,8 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", req_impl->stream); + nxt_unit_websocket_frame_release(&ws_impl->ws); + return NXT_UNIT_ERROR; } -- cgit From c554941b4f826d83d92d5ca8d7713bea4167896e Mon Sep 17 00:00:00 2001 From: Tiago de Bem Natel de Moura Date: Thu, 19 Sep 2019 15:25:23 +0300 Subject: Initial applications isolation support using Linux namespaces. --- src/nxt_unit.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 4497d09d..9ccd1fd9 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -333,6 +333,7 @@ nxt_unit_init(nxt_unit_init_t *init) } } + lib->pid = read_port.id.pid; ctx = &lib->main_ctx.ctx; rc = lib->callbacks.add_port(ctx, &ready_port); @@ -398,7 +399,6 @@ nxt_unit_create(nxt_unit_init_t *init) lib->processes.slot = NULL; lib->ports.slot = NULL; - lib->pid = getpid(); lib->log_fd = STDERR_FILENO; lib->online = 1; -- cgit