From 18fbfc3d5027df68b7696afb16323c66f2582100 Mon Sep 17 00:00:00 2001 From: Igor Sysoev Date: Mon, 6 Jul 2020 15:32:20 +0300 Subject: Destroying temporary router configuration. The lifespan of a listening socket is longer than both router configuration's and temporary router configuration's lifespan, so the sockets should be stored in persistent queues. Safety is ensured by the fact that the router processes only one new configuration at any time. --- src/nxt_router.c | 68 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 28 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 788199c7..88b87323 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -297,6 +297,14 @@ const nxt_process_init_t nxt_router_process = { }; +/* Queues of nxt_socket_conf_t */ +nxt_queue_t creating_sockets; +nxt_queue_t pending_sockets; +nxt_queue_t updating_sockets; +nxt_queue_t keeping_sockets; +nxt_queue_t deleting_sockets; + + static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) { @@ -1027,11 +1035,11 @@ nxt_router_temp_conf(nxt_task_t *task) goto temp_fail; } - nxt_queue_init(&tmcf->deleting); - nxt_queue_init(&tmcf->keeping); - nxt_queue_init(&tmcf->updating); - nxt_queue_init(&tmcf->pending); - nxt_queue_init(&tmcf->creating); + nxt_queue_init(&creating_sockets); + nxt_queue_init(&pending_sockets); + nxt_queue_init(&updating_sockets); + nxt_queue_init(&keeping_sockets); + nxt_queue_init(&deleting_sockets); #if (NXT_TLS) nxt_queue_init(&tmcf->tls); @@ -1088,11 +1096,11 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) tmcf = obj; - qlk = nxt_queue_first(&tmcf->pending); + qlk = nxt_queue_first(&pending_sockets); - if (qlk != nxt_queue_tail(&tmcf->pending)) { + if (qlk != nxt_queue_tail(&pending_sockets)) { nxt_queue_remove(qlk); - nxt_queue_insert_tail(&tmcf->creating, qlk); + nxt_queue_insert_tail(&creating_sockets, qlk); skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); @@ -1150,8 +1158,8 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) nxt_router_engines_post(router, tmcf); - nxt_queue_add(&router->sockets, &tmcf->updating); - nxt_queue_add(&router->sockets, &tmcf->creating); + nxt_queue_add(&router->sockets, &updating_sockets); + nxt_queue_add(&router->sockets, &creating_sockets); router->access_log = rtcf->access_log; @@ -1185,6 +1193,8 @@ nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) if (--tmcf->count == 0) { nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); + + nxt_mp_destroy(tmcf->mem_pool); } } @@ -1202,8 +1212,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_alert(task, "failed to apply new conf"); - for (qlk = nxt_queue_first(&tmcf->creating); - qlk != nxt_queue_tail(&tmcf->creating); + for (qlk = nxt_queue_first(&creating_sockets); + qlk != nxt_queue_tail(&creating_sockets); qlk = nxt_queue_next(qlk)) { skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); @@ -1217,9 +1227,9 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) } nxt_queue_init(&new_socket_confs); - nxt_queue_add(&new_socket_confs, &tmcf->updating); - nxt_queue_add(&new_socket_confs, &tmcf->pending); - nxt_queue_add(&new_socket_confs, &tmcf->creating); + nxt_queue_add(&new_socket_confs, &updating_sockets); + nxt_queue_add(&new_socket_confs, &pending_sockets); + nxt_queue_add(&new_socket_confs, &creating_sockets); rtcf = tmcf->router_conf; @@ -1241,8 +1251,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) router = rtcf->router; - nxt_queue_add(&router->sockets, &tmcf->keeping); - nxt_queue_add(&router->sockets, &tmcf->deleting); + nxt_queue_add(&router->sockets, &keeping_sockets); + nxt_queue_add(&router->sockets, &deleting_sockets); nxt_queue_add(&router->apps, &tmcf->previous); @@ -1253,6 +1263,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_mp_destroy(rtcf->mem_pool); nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); + + nxt_mp_destroy(tmcf->mem_pool); } @@ -1902,7 +1914,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf->access_log = access_log; } - nxt_queue_add(&tmcf->deleting, &router->sockets); + nxt_queue_add(&deleting_sockets, &router->sockets); nxt_queue_init(&router->sockets); return NXT_OK; @@ -2141,15 +2153,15 @@ nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, nskcf->listen = skcf->listen; nxt_queue_remove(qlk); - nxt_queue_insert_tail(&tmcf->keeping, qlk); + nxt_queue_insert_tail(&keeping_sockets, qlk); - nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); + nxt_queue_insert_tail(&updating_sockets, &nskcf->link); return NXT_OK; } } - nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); + nxt_queue_insert_tail(&pending_sockets, &nskcf->link); return NXT_DECLINED; } @@ -2577,13 +2589,13 @@ nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, { nxt_int_t ret; - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, + ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -2599,19 +2611,19 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, { nxt_int_t ret; - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, + ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, nxt_router_listen_socket_update); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); + ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -2631,12 +2643,12 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, return ret; } - ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); + ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); + return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); } -- cgit From 762511c5105119c45c676578f45473c7f906de60 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 23 Jul 2020 14:25:46 +0300 Subject: Fixing request_app_link reference counting. Racing conditions reproduced periodically on test_python_process_switch. --- src/nxt_router.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 88b87323..bf82501c 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -731,9 +731,7 @@ nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) nxt_assert(req_app_link->work.data == data); - nxt_atomic_fetch_add(&req_app_link->use_count, -1); - - nxt_request_app_link_release(task, req_app_link); + nxt_request_app_link_use(task, req_app_link, -1); } @@ -4695,7 +4693,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) &req_app_link->link_app_requests); } - ra_use_delta++; + nxt_request_app_link_inc_use(req_app_link); nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", req_app_link->stream); -- cgit From c617480eefc0822d52f9153906bb526ad483b9a3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Sat, 25 Jul 2020 11:06:32 +0300 Subject: Using plain shared memory for configuration pass. There is no restrictions on configration size and using segmented shared memory only doubles memory usage because to parse configration on router side, it needs to be 'plain' e. g. located in single continous memory buffer. --- src/nxt_router.c | 49 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 14 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index bf82501c..d4d037e1 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -906,8 +906,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + void *p; + size_t size; nxt_int_t ret; - nxt_buf_t *b; nxt_router_temp_conf_t *tmcf; tmcf = nxt_router_temp_conf(task); @@ -915,9 +916,33 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", - nxt_buf_used_size(msg->buf), - (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); + if (nxt_slow_path(msg->fd == -1)) { + nxt_alert(task, "conf_data_handler: invalid file shm fd"); + return; + } + + if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { + nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", + (int) nxt_buf_mem_used_size(&msg->buf->mem)); + + nxt_fd_close(msg->fd); + msg->fd = -1; + + return; + } + + nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); + + p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd, 0); + + nxt_fd_close(msg->fd); + msg->fd = -1; + + if (nxt_slow_path(p == MAP_FAILED)) { + return; + } + + nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); tmcf->router_conf->router = nxt_router; tmcf->stream = msg->port_msg.stream; @@ -928,20 +953,12 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (nxt_slow_path(tmcf->port == NULL)) { nxt_alert(task, "reply port not found"); - return; + goto fail; } nxt_port_use(task, tmcf->port, 1); - b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, - msg->buf, msg->size); - if (nxt_slow_path(b == NULL)) { - nxt_router_conf_error(task, tmcf); - - return; - } - - ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); + ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); if (nxt_fast_path(ret == NXT_OK)) { nxt_router_conf_apply(task, tmcf, NULL); @@ -949,6 +966,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } else { nxt_router_conf_error(task, tmcf); } + +fail: + + nxt_mem_munmap(p, size); } -- cgit From 78fd04adcf398a00549c4912f68eff77c94ab6c0 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 7 Aug 2020 15:06:18 +0300 Subject: Fixing listen event connection leakage. A connection object is allocated in advance for each listen event object to be used for the established connection. This connection needs to be freed when the listen event is destroyed. --- src/nxt_router.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index d4d037e1..b3e326d0 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -3178,6 +3178,10 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, nxt_debug(task, "listen event count: %D", lev->count); if (--lev->count == 0) { + if (lev->next != NULL) { + nxt_conn_free(task, lev->next); + } + nxt_free(lev); } -- cgit From 0f3abebd019130a6e4e69e53345f403ba802edfb Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Sun, 9 Aug 2020 10:22:05 +0300 Subject: Fixing connection remote sockaddr leakage. Earlier patch 1bf971f83571 fixes connection leakage. But connection free requires separate remote sockaddr release. --- src/nxt_router.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index b3e326d0..8b3f3daf 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -3177,8 +3177,12 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, nxt_debug(task, "listen event count: %D", lev->count); + engine = task->thread->engine; + if (--lev->count == 0) { if (lev->next != NULL) { + nxt_sockaddr_cache_free(engine, lev->next); + nxt_conn_free(task, lev->next); } @@ -3189,8 +3193,6 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, nxt_router_conf_release(task, joint); } - engine = task->thread->engine; - if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { nxt_thread_exit(task->thread); } -- cgit From 3a721e1d96720505d4d6638e77d2c296d962519c Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Sun, 9 Aug 2020 10:26:19 +0300 Subject: Fixing leaked configuration objects. If there are no listen sockets, the router configuration usage counter remains 0 and never decreases. The only moment to release a configuration is right after a configuration update. --- src/nxt_router.c | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 8b3f3daf..758310a9 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1208,13 +1208,39 @@ nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) static void nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) { - nxt_debug(task, "temp conf count:%D", tmcf->count); + uint32_t count; + nxt_router_conf_t *rtcf; + nxt_thread_spinlock_t *lock; - if (--tmcf->count == 0) { - nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); + nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); - nxt_mp_destroy(tmcf->mem_pool); + if (--tmcf->count > 0) { + return; } + + nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); + + rtcf = tmcf->router_conf; + + lock = &rtcf->router->lock; + + nxt_thread_spin_lock(lock); + + count = rtcf->count; + + nxt_thread_spin_unlock(lock); + + nxt_debug(task, "rtcf %p: %D", rtcf, count); + + if (count == 0) { + nxt_http_routes_cleanup(task, rtcf->routes); + + nxt_router_access_log_release(task, lock, rtcf->access_log); + + nxt_mp_destroy(rtcf->mem_pool); + } + + nxt_mp_destroy(tmcf->mem_pool); } -- cgit From 3cbc22a6dc45abdeade4deb364601230ddca02c1 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:10 +0300 Subject: Changing router to application port exchange protocol. The application process needs to request the port from the router instead of the latter pushing the port before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router to use the application shared port and then the queue. --- src/nxt_router.c | 100 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 17 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 758310a9..3380e133 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -182,6 +182,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs); static void nxt_router_thread_start(void *data); +static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, + void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data); static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, @@ -253,6 +255,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_get_port_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -274,6 +278,7 @@ static const nxt_str_t *nxt_app_msg_prefix[] = { static const nxt_port_handlers_t nxt_router_process_port_handlers = { .quit = nxt_signal_quit_handler, .new_port = nxt_router_new_port_handler, + .get_port = nxt_router_get_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, .data = nxt_router_conf_data_handler, @@ -2944,6 +2949,7 @@ nxt_router_thread_start(void *data) nxt_int_t ret; nxt_port_t *port; nxt_task_t *task; + nxt_work_t *work; nxt_thread_t *thread; nxt_thread_link_t *link; nxt_event_engine_t *engine; @@ -2988,10 +2994,42 @@ nxt_router_thread_start(void *data) nxt_port_enable(task, port, &nxt_router_app_port_handlers); + work = nxt_zalloc(sizeof(nxt_work_t)); + if (nxt_slow_path(work == NULL)) { + return; + } + + work->handler = nxt_router_rt_add_port; + work->task = link->work.task; + work->obj = work; + work->data = port; + + nxt_event_engine_post(link->work.task->thread->engine, work); + nxt_event_engine_start(engine); } +static void +nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t res; + nxt_port_t *port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + port = data; + + nxt_free(obj); + + res = nxt_port_hash_add(&rt->ports, port); + + if (nxt_fast_path(res == NXT_OK)) { + nxt_port_use(task, port, 1); + } +} + + static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) { @@ -3281,7 +3319,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) } /* TODO remove engine->port */ - /* TODO excude from connected ports */ if (rtcf != NULL) { nxt_debug(task, "old router conf is destroyed"); @@ -4937,7 +4974,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, { nxt_buf_t *buf; nxt_int_t res; - nxt_port_t *port, *c_port, *reply_port; + nxt_port_t *port, *reply_port; nxt_apr_action_t apr_action; nxt_assert(req_app_link->app_port != NULL); @@ -4947,21 +4984,6 @@ nxt_router_app_prepare_request(nxt_task_t *task, apr_action = NXT_APR_REQUEST_FAILED; - c_port = nxt_process_connected_port_find(port->process, reply_port); - - if (nxt_slow_path(c_port != reply_port)) { - res = nxt_port_send_port(task, port, reply_port, 0); - - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send reply port to application"); - - goto release_port; - } - - nxt_process_connected_port_add(port->process, reply_port); - } - buf = nxt_router_prepare_msg(task, req_app_link->request, port, nxt_app_msg_prefix[port->app->type]); @@ -5531,3 +5553,47 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -1, 0, 0, NULL); } } + + +static void +nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port, *reply_port; + nxt_runtime_t *rt; + nxt_port_msg_get_port_t *get_port_msg; + + rt = task->thread->runtime; + + reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(reply_port == NULL)) { + nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_port_t))) + { + nxt_alert(task, "get_port_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; + + port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_port_handler: port %PI:%d not found", + get_port_msg->pid, get_port_msg->id); + + return; + } + + nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, + get_port_msg->id); + + (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); +} -- cgit From 6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:13 +0300 Subject: Changing router to application shared memory exchange protocol. The application process needs to request the shared memory segment from the router instead of the latter pushing the segment before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router for using the application shared port and then the queue. --- src/nxt_router.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 3 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 3380e133..4df1489d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -257,6 +257,8 @@ static void nxt_router_http_request_release(nxt_task_t *task, void *obj, static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_get_mmap_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -281,6 +283,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = { .get_port = nxt_router_get_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, + .get_mmap = nxt_router_get_mmap_handler, .data = nxt_router_conf_data_handler, .remove_pid = nxt_router_remove_pid_handler, .access_log = nxt_router_access_log_reopen_handler, @@ -5008,7 +5011,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, buf = req_app_link->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, port, + res = nxt_port_mmap_get_tracking(task, &port->process->outgoing, &req_app_link->msg_info.tracking, req_app_link->stream); if (nxt_slow_path(res != NXT_OK)) { @@ -5138,7 +5141,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, return NULL; } - out = nxt_port_mmap_get_buf(task, port, + out = nxt_port_mmap_get_buf(task, &port->process->outgoing, nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); if (nxt_slow_path(out == NULL)) { return NULL; @@ -5320,7 +5323,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (buf == NULL) { free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, port, free_size); + buf = nxt_port_mmap_get_buf(task, &port->process->outgoing, + free_size); if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; @@ -5555,6 +5559,65 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +static void +nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_fd_t fd; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_port_mmaps_t *mmaps; + nxt_port_msg_get_mmap_t *get_mmap_msg; + nxt_port_mmap_handler_t *mmap_handler; + + rt = task->thread->runtime; + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_mmap_t))) + { + nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; + + nxt_assert(port->type == NXT_PROCESS_APP); + + mmaps = &port->process->outgoing; + nxt_thread_mutex_lock(&mmaps->mutex); + + if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", + (int) get_mmap_msg->id); + + return; + } + + mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; + + fd = mmap_handler->fd; + + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_debug(task, "get mmap %PI:%d found", + msg->port_msg.pid, (int) get_mmap_msg->id); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); +} + + static void nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { -- cgit From 83595606121a821f9e3cef0f0b7e7fe87eb1e50a Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:15 +0300 Subject: Introducing the shared application port. This is the port shared between all application processes which use it to pass requests for processing. Using it significantly simplifies the request processing code in the router. The drawback is 2 more file descriptors per each configured application and more complex libunit message wait/read code. --- src/nxt_router.c | 1508 +++++++++++++++++------------------------------------- 1 file changed, 464 insertions(+), 1044 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 4df1489d..44b303e4 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -61,59 +61,13 @@ typedef struct { } nxt_app_rpc_t; -struct nxt_port_select_state_s { - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; - - nxt_port_t *failed_port; - int failed_port_use_delta; - - uint8_t start_process; /* 1 bit */ - nxt_request_app_link_t *shared_ra; - nxt_port_t *port; -}; - -typedef struct nxt_port_select_state_s nxt_port_select_state_t; - static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp); static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port); -static void nxt_router_port_select(nxt_task_t *task, - nxt_port_select_state_t *state); - -static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, - nxt_port_select_state_t *state); - static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); -static void nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); - - -nxt_inline void -nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) -{ - nxt_atomic_fetch_add(&req_app_link->use_count, 1); -} - -nxt_inline void -nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) -{ -#if (NXT_DEBUG) - int c; - - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - - nxt_assert((c + i) > 0); -#else - (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); -#endif -} - -static void nxt_request_app_link_use(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, int i); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); @@ -196,6 +150,8 @@ static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); static void nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf); @@ -220,6 +176,8 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, + nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -227,13 +185,15 @@ static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); -static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link); +static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_done(nxt_task_t *task, void *obj, + void *data); static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); + nxt_request_rpc_data_t *req_rpc_data); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, - nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); + nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, @@ -250,7 +210,7 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i); -static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, +static void nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r); static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); @@ -501,83 +461,6 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) } -nxt_inline void -nxt_request_app_link_init(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_buf_t *body; - nxt_event_engine_t *engine; - - engine = task->thread->engine; - - nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); - - req_app_link->stream = req_rpc_data->stream; - req_app_link->use_count = 1; - req_app_link->req_rpc_data = req_rpc_data; - req_rpc_data->req_app_link = req_app_link; - req_app_link->reply_port = engine->port; - req_app_link->request = req_rpc_data->request; - req_app_link->apr_action = NXT_APR_GOT_RESPONSE; - - req_app_link->work.handler = NULL; - req_app_link->work.task = &engine->task; - req_app_link->work.obj = req_app_link; - req_app_link->work.data = engine; - - body = req_rpc_data->request->body; - - if (body != NULL && nxt_buf_is_file(body)) { - req_app_link->body_fd = body->file->fd; - - body->file->fd = -1; - - } else { - req_app_link->body_fd = -1; - } -} - - -nxt_inline nxt_request_app_link_t * -nxt_request_app_link_alloc(nxt_task_t *task, - nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_mp_t *mp; - nxt_request_app_link_t *req_app_link; - - if (ra_src != NULL && ra_src->mem_pool != NULL) { - return ra_src; - } - - mp = req_rpc_data->request->mem_pool; - - req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); - - if (nxt_slow_path(req_app_link == NULL)) { - - req_rpc_data->req_app_link = NULL; - - if (ra_src != NULL) { - ra_src->req_rpc_data = NULL; - } - - return NULL; - } - - nxt_mp_retain(mp); - - nxt_request_app_link_init(task, req_app_link, req_rpc_data); - - if (ra_src != NULL) { - req_app_link->body_fd = ra_src->body_fd; - } - - req_app_link->mem_pool = mp; - - return req_app_link; -} - - nxt_inline nxt_bool_t nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, uint32_t stream) @@ -614,198 +497,6 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, } -static void -nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, - void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = obj; - - nxt_request_app_link_update_peer(task, req_app_link); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void -nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) -{ - nxt_event_engine_t *engine; - nxt_request_rpc_data_t *req_rpc_data; - - engine = req_app_link->work.data; - - if (task->thread->engine != engine) { - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_update_peer_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; - - nxt_debug(task, "req_app_link stream #%uD post update peer to %p", - req_app_link->stream, engine); - - nxt_event_engine_post(engine, &req_app_link->work); - - return; - } - - nxt_debug(task, "req_app_link stream #%uD update peer", - req_app_link->stream); - - req_rpc_data = req_app_link->req_rpc_data; - - if (req_rpc_data != NULL && req_app_link->app_port != NULL) { - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, - req_app_link->app_port->pid); - } -} - - -static void -nxt_request_app_link_release(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) -{ - nxt_mp_t *mp; - nxt_http_request_t *r; - nxt_request_rpc_data_t *req_rpc_data; - - nxt_assert(task->thread->engine == req_app_link->work.data); - nxt_assert(req_app_link->use_count == 0); - - nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); - - req_rpc_data = req_app_link->req_rpc_data; - - if (req_rpc_data != NULL) { - if (nxt_slow_path(req_app_link->err_code != 0)) { - nxt_http_request_error(task, req_rpc_data->request, - req_app_link->err_code); - - } else { - req_rpc_data->app_port = req_app_link->app_port; - req_rpc_data->apr_action = req_app_link->apr_action; - req_rpc_data->msg_info = req_app_link->msg_info; - - if (req_rpc_data->app->timeout != 0) { - r = req_rpc_data->request; - - r->timer.handler = nxt_router_app_timeout; - r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); - } - - req_app_link->app_port = NULL; - req_app_link->msg_info.buf = NULL; - } - - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; - } - - if (req_app_link->app_port != NULL) { - nxt_router_app_port_release(task, req_app_link->app_port, - req_app_link->apr_action); - - req_app_link->app_port = NULL; - } - - if (req_app_link->body_fd != -1) { - nxt_fd_close(req_app_link->body_fd); - - req_app_link->body_fd = -1; - } - - nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); - - mp = req_app_link->mem_pool; - - if (mp != NULL) { - nxt_mp_free(mp, req_app_link); - nxt_mp_release(mp); - } -} - - -static void -nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = obj; - - nxt_assert(req_app_link->work.data == data); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void -nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, - int i) -{ - int c; - nxt_event_engine_t *engine; - - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - - if (i < 0 && c == -i) { - engine = req_app_link->work.data; - - if (task->thread->engine == engine) { - nxt_request_app_link_release(task, req_app_link); - - return; - } - - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_release_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; - - nxt_debug(task, "req_app_link stream #%uD post release to %p", - req_app_link->stream, engine); - - nxt_event_engine_post(engine, &req_app_link->work); - } -} - - -nxt_inline void -nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link, const char *str) -{ - req_app_link->app_port = NULL; - req_app_link->err_code = 500; - req_app_link->err_str = str; - - nxt_alert(task, "app \"%V\" internal error: %s on #%uD", - &app->name, str, req_app_link->stream); -} - - -nxt_inline void -nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, - &req_app_link->link_port_pending); - nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); - - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->res_time = nxt_thread_monotonic_time(task->thread) - + app->res_timeout; - - nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", - req_app_link->stream); -} - - nxt_inline nxt_bool_t nxt_queue_chk_remove(nxt_queue_link_t *lnk) { @@ -825,8 +516,9 @@ nxt_inline void nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { - int ra_use_delta; - nxt_request_app_link_t *req_app_link; + nxt_http_request_t *r; + + nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); if (req_rpc_data->app_port != NULL) { nxt_router_app_port_release(task, req_rpc_data->app_port, @@ -835,53 +527,34 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, req_rpc_data->app_port = NULL; } - nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); - - req_app_link = req_rpc_data->req_app_link; - if (req_app_link != NULL) { - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; - - ra_use_delta = 0; - - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + if (req_rpc_data->app != NULL) { + nxt_router_app_use(task, req_rpc_data->app, -1); - if (req_app_link->link_app_requests.next == NULL - && req_app_link->link_port_pending.next == NULL - && req_app_link->link_app_pending.next == NULL - && req_app_link->link_port_websockets.next == NULL) - { - req_app_link = NULL; + req_rpc_data->app = NULL; + } - } else { - ra_use_delta -= - nxt_queue_chk_remove(&req_app_link->link_app_requests) - + nxt_queue_chk_remove(&req_app_link->link_port_pending) - + nxt_queue_chk_remove(&req_app_link->link_port_websockets); + r = req_rpc_data->request; - nxt_queue_chk_remove(&req_app_link->link_app_pending); - } + if (r != NULL) { + r->timer_data = NULL; - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + nxt_router_http_request_release_post(task, r); - if (req_app_link != NULL) { - nxt_request_app_link_use(task, req_app_link, ra_use_delta); - } + r->req_rpc_data = NULL; + req_rpc_data->request = NULL; } - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_fd_close(req_rpc_data->msg_info.body_fd); - req_rpc_data->app = NULL; + req_rpc_data->msg_info.body_fd = -1; } - if (req_rpc_data->request != NULL) { - req_rpc_data->request->timer_data = NULL; - - nxt_router_http_request_done(task, req_rpc_data->request); + if (req_rpc_data->rpc_cancel) { + req_rpc_data->rpc_cancel = 0; - req_rpc_data->request->req_rpc_data = NULL; - req_rpc_data->request = NULL; + nxt_port_rpc_cancel(task, task->thread->engine->port, + req_rpc_data->stream); } } @@ -889,25 +562,62 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_app_t *app; + nxt_port_t *port, *main_app_port; + nxt_runtime_t *rt; + nxt_port_new_port_handler(task, msg); - if (msg->u.new_port != NULL - && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) - { + port = msg->u.new_port; + + if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { nxt_router_greet_controller(task, msg->u.new_port); } - if (msg->port_msg.stream == 0) { - return; - } + if (port == NULL || port->type != NXT_PROCESS_APP) { + + if (msg->port_msg.stream == 0) { + return; + } - if (msg->u.new_port == NULL - || msg->u.new_port->type != NXT_PROCESS_APP) - { msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } - nxt_port_rpc_handler(task, msg); + if (msg->port_msg.stream != 0) { + nxt_port_rpc_handler(task, msg); + return; + } + + /* + * Port with "id == 0" is application 'main' port and it always + * should come with non-zero stream. + */ + nxt_assert(port->id != 0); + + /* Find 'main' app port and get app reference. */ + rt = task->thread->runtime; + + /* + * It is safe to access 'runtime->ports' hash because 'NEW_PORT' + * sent to main port (with id == 0) and processed in main thread. + */ + main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); + nxt_assert(main_app_port != NULL); + + app = main_app_port->app; + nxt_assert(app != NULL); + + nxt_thread_mutex_lock(&app->mutex); + + /* TODO here should be find-and-add code because there can be + port waiters in port_hash */ + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; + + nxt_thread_mutex_unlock(&app->mutex); + + port->app = app; + port->main_app_port = main_app_port; } @@ -1100,8 +810,10 @@ nxt_router_app_can_start(nxt_app_t *app) nxt_inline nxt_bool_t nxt_router_app_need_start(nxt_app_t *app) { - return app->idle_processes + app->pending_processes - < app->spare_processes; + return (app->active_requests + > app->port_hash_count + app->pending_processes) + || (app->spare_processes + > app->idle_processes + app->pending_processes); } @@ -1530,6 +1242,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_str_t *t, *s, *targets; nxt_uint_t n, i; + nxt_port_t *port; nxt_router_t *router; nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value, *websocket; @@ -1744,8 +1457,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); - nxt_queue_init(&app->requests); - nxt_queue_init(&app->pending); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -1758,7 +1469,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->timeout = apcf.timeout; app->res_timeout = apcf.res_timeout * 1000000; app->idle_timeout = apcf.idle_timeout; - app->max_pending_responses = 2; app->max_requests = apcf.requests; app->targets = targets; @@ -1789,6 +1499,25 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_joint->free_app_work.handler = nxt_router_free_app; app_joint->free_app_work.task = &engine->task; app_joint->free_app_work.obj = app_joint; + + port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + + nxt_port_write_enable(task, port); + port->app = app; + + app->shared_port = port; + + nxt_thread_mutex_create(&app->outgoing.mutex); } } @@ -2522,7 +2251,13 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app = rpc->app; port = msg->u.new_port; + + nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); + port->app = app; + port->main_app_port = port; app->pending_processes--; app->processes++; @@ -2532,11 +2267,15 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_insert_tail(&app->ports, &port->app_link); nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; port->idle_start = 0; nxt_port_inc_use(port); + nxt_router_app_shared_port_send(task, port); + nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); } @@ -2939,10 +2678,11 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) static nxt_port_handlers_t nxt_router_app_port_handlers = { - .rpc_error = nxt_port_rpc_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_rpc_handler, - .oosm = nxt_router_oosm_handler, + .rpc_error = nxt_port_rpc_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, + .req_headers_ack = nxt_port_rpc_handler, }; @@ -3736,22 +3476,17 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_int_t ret; + nxt_app_t *app; nxt_buf_t *b, *next; nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; nxt_http_request_t *r; nxt_unit_response_t *resp; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; - b = msg->buf; req_rpc_data = data; - if (msg->size == 0) { - b = NULL; - } - r = req_rpc_data->request; if (nxt_slow_path(r == NULL)) { return; @@ -3762,19 +3497,32 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, return; } + app = req_rpc_data->app; + nxt_assert(app != NULL); + + if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { + nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); + + return; + } + + b = (msg->size == 0) ? NULL : msg->buf; + if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); nxt_buf_chain_add(&b, nxt_http_buf_last(r)); + req_rpc_data->rpc_cancel = 0; + req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + nxt_request_rpc_data_unlink(task, req_rpc_data); } else { - if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { + if (app->timeout != 0) { r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); } } @@ -3870,39 +3618,21 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (r->websocket_handshake && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) { - req_app_link = nxt_request_app_link_alloc(task, - req_rpc_data->req_app_link, - req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - app_port = req_app_link->app_port; - - if (app_port == NULL && req_rpc_data->app_port != NULL) { - req_app_link->app_port = req_rpc_data->app_port; - app_port = req_app_link->app_port; - req_app_link->apr_action = req_rpc_data->apr_action; - - req_rpc_data->app_port = NULL; - } - + app_port = req_rpc_data->app_port; if (nxt_slow_path(app_port == NULL)) { goto fail; } - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + nxt_thread_mutex_lock(&app->mutex); - nxt_queue_insert_tail(&app_port->active_websockets, - &req_app_link->link_port_websockets); + app_port->main_app_port->active_websockets++; - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + nxt_thread_mutex_unlock(&app->mutex); nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); - req_app_link->apr_action = NXT_APR_CLOSE; + req_rpc_data->apr_action = NXT_APR_CLOSE; - nxt_debug(task, "req_app_link stream #%uD upgrade", - req_app_link->stream); + nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); r->state = &nxt_http_websocket; @@ -3921,8 +3651,96 @@ fail: } -static const nxt_http_request_state_t nxt_http_request_send_state - nxt_aligned(64) = +static void +nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) +{ + nxt_app_t *app; + nxt_bool_t start_process; + nxt_port_t *app_port, *main_app_port, *idle_port; + nxt_queue_link_t *idle_lnk; + nxt_http_request_t *r; + + nxt_debug(task, "stream #%uD: got ack from %PI:%d", + req_rpc_data->stream, + msg->port_msg.pid, msg->port_msg.reply_port); + + nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, + msg->port_msg.pid); + + app = req_rpc_data->app; + + start_process = 0; + + nxt_thread_mutex_lock(&app->mutex); + + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(app_port == NULL)) { + nxt_thread_mutex_unlock(&app->mutex); + + r = req_rpc_data->request; + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + + return; + } + + main_app_port = app_port->main_app_port; + + if (nxt_queue_chk_remove(&main_app_port->idle_link)) { + app->idle_processes--; + + /* Check port was in 'spare_ports' using idle_start field. */ + if (main_app_port->idle_start == 0 + && app->idle_processes >= app->spare_processes) + { + /* + * If there is a vacant space in spare ports, + * move the last idle to spare_ports. + */ + nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); + + idle_lnk = nxt_queue_last(&app->idle_ports); + idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); + nxt_queue_remove(idle_lnk); + + nxt_queue_insert_tail(&app->spare_ports, idle_lnk); + + idle_port->idle_start = 0; + } + + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; + } + } + + main_app_port->active_requests++; + + nxt_port_inc_use(app_port); + + nxt_thread_mutex_unlock(&app->mutex); + + if (start_process) { + nxt_router_start_app_process(task, app); + } + + nxt_port_use(task, req_rpc_data->app_port, -1); + + req_rpc_data->app_port = app_port; + + if (app->timeout != 0) { + r = req_rpc_data->request; + + r->timer.handler = nxt_router_app_timeout; + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); + } +} + + +static const nxt_http_request_state_t nxt_http_request_send_state + nxt_aligned(64) = { .error_handler = nxt_http_request_error_handler, }; @@ -3949,42 +3767,14 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t res; - nxt_port_t *port; - nxt_bool_t cancelled; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; req_rpc_data = data; - req_app_link = req_rpc_data->req_app_link; - - if (req_app_link != NULL) { - cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, - req_app_link->stream); - if (cancelled) { - res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); - - if (res == NXT_OK) { - port = req_app_link->app_port; - - if (nxt_slow_path(port == NULL)) { - nxt_log(task, NXT_LOG_ERR, - "port is NULL in cancelled req_app_link"); - return; - } - - nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, - req_rpc_data, port->pid); - - nxt_router_app_prepare_request(task, req_app_link); - } + req_rpc_data->rpc_cancel = 0; - msg->port_msg.last = 0; - - return; - } - } + /* TODO cancel message and return if cancelled. */ + // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); if (req_rpc_data->request != NULL) { nxt_http_request_error(task, req_rpc_data->request, @@ -4008,6 +3798,8 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_assert(app_joint != NULL); nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); app = app_joint->app; @@ -4022,6 +3814,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, } port->app = app; + port->main_app_port = port; nxt_thread_mutex_lock(&app->mutex); @@ -4029,24 +3822,60 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app->pending_processes--; app->processes++; + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; nxt_thread_mutex_unlock(&app->mutex); nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); + nxt_router_app_shared_port_send(task, port); + nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); } +static nxt_int_t +nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) +{ + nxt_buf_t *b; + nxt_port_t *port; + nxt_port_msg_new_port_t *msg; + + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(nxt_port_data_t)); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } + + port = app_port->app->shared_port; + + nxt_debug(task, "send port %FD to process %PI", + port->pair[0], app_port->pid); + + b->mem.free += sizeof(nxt_port_msg_new_port_t); + msg = (nxt_port_msg_new_port_t *) b->mem.pos; + + msg->id = port->id; + msg->pid = port->pid; + msg->max_size = port->max_size; + msg->max_share = port->max_share; + msg->type = port->type; + + return nxt_port_socket_twrite(task, app_port, + NXT_PORT_MSG_NEW_PORT, + port->pair[0], + 0, 0, b, NULL); +} + + static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link; + nxt_app_t *app; + nxt_app_joint_t *app_joint; app_joint = data; @@ -4070,32 +3899,11 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, app->pending_processes--; - if (!nxt_queue_is_empty(&app->requests)) { - lnk = nxt_queue_last(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); - - } else { - req_app_link = NULL; - } - nxt_thread_mutex_unlock(&app->mutex); - if (req_app_link != NULL) { - nxt_debug(task, "app '%V' %p abort next stream #%uD", - &app->name, app, req_app_link->stream); - - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start application process"); - nxt_request_app_link_use(task, req_app_link, -1); - } + /* TODO req_app_link to cancel first pending message */ } -nxt_inline nxt_port_t * -nxt_router_app_get_port_for_quit(nxt_app_t *app); void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) @@ -4116,63 +3924,6 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) } -nxt_inline nxt_bool_t -nxt_router_app_first_port_busy(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - - return port->app_pending_responses > 0; -} - - -nxt_inline nxt_port_t * -nxt_router_pop_first_port(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - nxt_queue_remove(lnk); - - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - - port->app_pending_responses++; - - if (nxt_queue_chk_remove(&port->idle_link)) { - app->idle_processes--; - - if (port->idle_start == 0) { - nxt_assert(app->idle_processes < app->spare_processes); - - } else { - nxt_assert(app->idle_processes >= app->spare_processes); - - port->idle_start = 0; - } - } - - if ((app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - nxt_queue_insert_tail(&app->ports, lnk); - - nxt_port_inc_use(port); - - } else { - lnk->next = NULL; - } - - return port; -} - - nxt_inline nxt_port_t * nxt_router_app_get_port_for_quit(nxt_app_t *app) { @@ -4184,12 +3935,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - if (port->app_pending_responses > 0) { - port = NULL; - - continue; - } - /* Caller is responsible to decrease port use count. */ nxt_queue_chk_remove(&port->app_link); @@ -4197,6 +3942,9 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) app->idle_processes--; } + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + port->app = NULL; app->processes--; @@ -4221,72 +3969,37 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) } -static void -nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = data; - -#if (NXT_DEBUG) - { - nxt_app_t *app; - - app = obj; - - nxt_assert(app != NULL); - nxt_assert(req_app_link != NULL); - nxt_assert(req_app_link->app_port != NULL); - - nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, req_app_link->stream); - } -#endif - - nxt_router_app_prepare_request(task, req_app_link); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action) { - int inc_use; - uint32_t dec_pending, got_response; - nxt_app_t *app; - nxt_bool_t port_unchained; - nxt_bool_t send_quit, cancelled, adjust_idle_timer; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra; - nxt_port_select_state_t state; + int inc_use; + uint32_t got_response, dec_requests; + nxt_app_t *app; + nxt_bool_t port_unchained, send_quit, adjust_idle_timer; + nxt_port_t *main_app_port; nxt_assert(port != NULL); nxt_assert(port->app != NULL); - req_app_link = NULL; - app = port->app; inc_use = 0; - dec_pending = 0; got_response = 0; + dec_requests = 0; switch (action) { case NXT_APR_NEW_PORT: break; case NXT_APR_REQUEST_FAILED: - dec_pending = 1; + dec_requests = 1; inc_use = -1; break; case NXT_APR_GOT_RESPONSE: - dec_pending = 1; got_response = 1; inc_use = -1; break; case NXT_APR_UPGRADE: - dec_pending = 1; got_response = 1; break; case NXT_APR_CLOSE: @@ -4294,120 +4007,49 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, break; } - nxt_thread_mutex_lock(&app->mutex); - - port->app_pending_responses -= dec_pending; - port->app_responses += got_response; + nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, + port->pid, port->id, + (int) inc_use, (int) got_response); - if (port->pair[1] != -1 - && (app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - if (port->app_link.next == NULL) { - if (port->app_pending_responses > 0) { - nxt_queue_insert_tail(&app->ports, &port->app_link); + if (port == app->shared_port) { + nxt_thread_mutex_lock(&app->mutex); - } else { - nxt_queue_insert_head(&app->ports, &port->app_link); - } + app->active_requests -= got_response + dec_requests; - nxt_port_inc_use(port); + nxt_thread_mutex_unlock(&app->mutex); - } else { - if (port->app_pending_responses == 0 - && nxt_queue_first(&app->ports) != &port->app_link) - { - nxt_queue_remove(&port->app_link); - nxt_queue_insert_head(&app->ports, &port->app_link); - } - } + goto adjust_use; } - if (!nxt_queue_is_empty(&app->ports) - && !nxt_queue_is_empty(&app->requests)) - { - lnk = nxt_queue_first(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); + main_app_port = port->main_app_port; - req_app_link->app_port = nxt_router_pop_first_port(app); + nxt_thread_mutex_lock(&app->mutex); - if (req_app_link->app_port->app_pending_responses > 1) { - nxt_request_app_link_pending(task, app, req_app_link); - } - } + main_app_port->app_responses += got_response; + main_app_port->active_requests -= got_response + dec_requests; + app->active_requests -= got_response + dec_requests; - /* Pop first pending request for this port. */ - if (dec_pending > 0 - && !nxt_queue_is_empty(&port->pending_requests)) + if (main_app_port->pair[1] != -1 + && (app->max_requests == 0 + || main_app_port->app_responses < app->max_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_queue_remove(&pending_ra->link_app_pending); - pending_ra->link_app_pending.next = NULL; - - } else { - pending_ra = NULL; - } - - /* Try to cancel and re-schedule first stalled request for this app. */ - if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { - lnk = nxt_queue_first(&app->pending); - - re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_pending); - - if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { - - nxt_debug(task, "app '%V' stalled request #%uD detected", - &app->name, re_ra->stream); + if (main_app_port->app_link.next == NULL) { + nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, - re_ra->stream); - - if (cancelled) { - state.req_app_link = re_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove re_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(re_ra); - - nxt_router_port_select(task, &state); - - goto re_ra_cancelled; - } + nxt_port_inc_use(main_app_port); } } - re_ra = NULL; - -re_ra_cancelled: - send_quit = (app->max_requests > 0 - && port->app_pending_responses == 0 - && port->app_responses >= app->max_requests); + && main_app_port->app_responses >= app->max_requests); if (send_quit) { - port_unchained = nxt_queue_chk_remove(&port->app_link); + port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); - port->app = NULL; + nxt_port_hash_remove(&app->port_hash, main_app_port); + app->port_hash_count--; + + main_app_port->app = NULL; app->processes--; } else { @@ -4416,9 +4058,10 @@ re_ra_cancelled: adjust_idle_timer = 0; - if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0 - && nxt_queue_is_empty(&port->active_websockets) - && port->idle_link.next == NULL) + if (main_app_port->pair[1] != -1 && !send_quit + && main_app_port->active_requests == 0 + && main_app_port->active_websockets == 0 + && main_app_port->idle_link.next == NULL) { if (app->idle_processes == app->spare_processes && app->adjust_idle_work.data == NULL) @@ -4429,12 +4072,12 @@ re_ra_cancelled: } if (app->idle_processes < app->spare_processes) { - nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); } else { - nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); + nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); - port->idle_start = task->thread->engine->timers.now; + main_app_port->idle_start = task->thread->engine->timers.now; } app->idle_processes++; @@ -4447,60 +4090,22 @@ re_ra_cancelled: nxt_event_engine_post(app->engine, &app->adjust_idle_work); } - if (pending_ra != NULL) { - nxt_request_app_link_use(task, pending_ra, -1); - } - - if (re_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep re_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, re_ra); - - } else { - nxt_request_app_link_use(task, re_ra, -1); - } - } - - if (req_app_link != NULL) { - /* - * There should be call nxt_request_app_link_inc_use(req_app_link), - * because of one more link in the queue. But one link was - * recently removed from app->requests linked list. - * Corresponding decrement is in nxt_router_app_process_request(). - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, req_app_link); - - goto adjust_use; - } - /* ? */ - if (port->pair[1] == -1) { + if (main_app_port->pair[1] == -1) { nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", - &app->name, app, port, port->pid); + &app->name, app, main_app_port, main_app_port->pid); goto adjust_use; } if (send_quit) { - nxt_debug(task, "app '%V' %p send QUIT to port", - &app->name, app); + nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); + nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); if (port_unchained) { - nxt_port_use(task, port, -1); + nxt_port_use(task, main_app_port, -1); } goto adjust_use; @@ -4529,6 +4134,18 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + + if (port->id != 0) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, + port->pid, port->id); + + return; + } + unchain = nxt_queue_chk_remove(&port->app_link); if (nxt_queue_chk_remove(&port->idle_link)) { @@ -4553,8 +4170,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) start_process = !task->thread->engine->shutdown && nxt_router_app_can_start(app) - && (!nxt_queue_is_empty(&app->requests) - || nxt_router_app_need_start(app)); + && nxt_router_app_need_start(app); if (start_process) { app->pending_processes++; @@ -4603,6 +4219,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) app->adjust_idle_work.data = NULL; } + nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", + &app->name, + (int) app->idle_processes, (int) app->spare_processes); + while (app->idle_processes > app->spare_processes) { nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); @@ -4612,6 +4232,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) timeout = port->idle_start + app->idle_timeout; + nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", + &app->name, port->pid, + port->idle_start, timeout, threshold); + if (timeout > threshold) { break; } @@ -4621,6 +4245,9 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_queue_chk_remove(&port->app_link); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + app->idle_processes--; app->processes--; port->app = NULL; @@ -4704,12 +4331,23 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) } nxt_assert(app->processes == 0); + nxt_assert(app->active_requests == 0); + nxt_assert(app->port_hash_count == 0); nxt_assert(app->idle_processes == 0); - nxt_assert(nxt_queue_is_empty(&app->requests)); nxt_assert(nxt_queue_is_empty(&app->ports)); nxt_assert(nxt_queue_is_empty(&app->spare_ports)); nxt_assert(nxt_queue_is_empty(&app->idle_ports)); + nxt_port_mmaps_destroy(&app->outgoing, 1); + + nxt_thread_mutex_destroy(&app->outgoing.mutex); + + if (app->shared_port != NULL) { + app->shared_port->app = NULL; + nxt_port_close(task, app->shared_port); + nxt_port_use(task, app->shared_port, -1); + } + nxt_thread_mutex_destroy(&app->mutex); nxt_mp_destroy(app->mem_pool); @@ -4726,178 +4364,34 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void -nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) -{ - int ra_use_delta; - nxt_app_t *app; - nxt_bool_t can_start_process; - nxt_request_app_link_t *req_app_link; - - req_app_link = state->req_app_link; - app = state->app; - - state->failed_port_use_delta = 0; - ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); - - if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) - { - nxt_assert(req_app_link->link_app_pending.next != NULL); - - nxt_queue_remove(&req_app_link->link_app_pending); - req_app_link->link_app_pending.next = NULL; - - ra_use_delta--; - } - - state->failed_port = req_app_link->app_port; - - if (req_app_link->app_port != NULL) { - state->failed_port_use_delta--; - - state->failed_port->app_pending_responses--; - - if (nxt_queue_chk_remove(&state->failed_port->app_link)) { - state->failed_port_use_delta--; - } - - req_app_link->app_port = NULL; - } - - can_start_process = nxt_router_app_can_start(app); - - state->port = NULL; - state->start_process = 0; - - if (nxt_queue_is_empty(&app->ports) - || (can_start_process && nxt_router_app_first_port_busy(app)) ) - { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - if (nxt_slow_path(state->failed_port != NULL)) { - nxt_queue_insert_head(&app->requests, - &req_app_link->link_app_requests); - - } else { - nxt_queue_insert_tail(&app->requests, - &req_app_link->link_app_requests); - } - - nxt_request_app_link_inc_use(req_app_link); - - nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", - req_app_link->stream); - - if (can_start_process) { - app->pending_processes++; - state->start_process = 1; - } - - } else { - state->port = nxt_router_pop_first_port(app); - - if (state->port->app_pending_responses > 1) { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - req_app_link->app_port = state->port; - - nxt_request_app_link_pending(task, app, req_app_link); - } - - if (can_start_process && nxt_router_app_need_start(app)) { - app->pending_processes++; - state->start_process = 1; - } - } - - nxt_request_app_link_chk_use(req_app_link, ra_use_delta); - -fail: - - state->shared_ra = req_app_link; -} - - -static nxt_int_t -nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) +nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data) { - nxt_int_t res; - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; - - req_app_link = state->shared_ra; - app = state->app; - - if (state->failed_port_use_delta != 0) { - nxt_port_use(task, state->failed_port, state->failed_port_use_delta); - } - - if (nxt_slow_path(req_app_link == NULL)) { - if (state->port != NULL) { - nxt_port_use(task, state->port, -1); - } - - nxt_request_app_link_error(task, app, state->req_app_link, - "Failed to allocate shared req<->app link"); - - return NXT_ERROR; - } - - if (state->port != NULL) { - nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); + nxt_bool_t start_process; + nxt_port_t *port; - req_app_link->app_port = state->port; + start_process = 0; - if (state->start_process) { - nxt_router_start_app_process(task, app); - } + nxt_thread_mutex_lock(&app->mutex); - return NXT_OK; - } + port = app->shared_port; + nxt_port_inc_use(port); - if (!state->start_process) { - nxt_debug(task, "app '%V' %p too many running or pending processes", - &app->name, app); + app->active_requests++; - return NXT_AGAIN; + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; } - res = nxt_router_start_app_process(task, app); + nxt_thread_mutex_unlock(&app->mutex); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start app process"); + req_rpc_data->app_port = port; + req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; - return NXT_ERROR; + if (start_process) { + nxt_router_start_app_process(task, app); } - - return NXT_AGAIN; -} - - -static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_port_select_state_t state; - - state.req_app_link = req_app_link; - state.app = app; - - nxt_thread_mutex_lock(&app->mutex); - - nxt_router_port_select(task, &state); - - nxt_thread_mutex_unlock(&app->mutex); - - return nxt_router_port_post_select(task, &state); } @@ -4905,10 +4399,7 @@ void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_app_t *app) { - nxt_int_t res; - nxt_port_t *port; nxt_event_engine_t *engine; - nxt_request_app_link_t ra_local, *req_app_link; nxt_request_rpc_data_t *req_rpc_data; engine = task->thread->engine; @@ -4927,7 +4418,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, * in port handlers. Need to fixup request memory pool. Counterpart * release will be called via following call chain: * nxt_request_rpc_data_unlink() -> - * nxt_router_http_request_done() -> + * nxt_router_http_request_release_post() -> * nxt_router_http_request_release() */ nxt_mp_retain(r->mem_pool); @@ -4939,29 +4430,37 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; + req_rpc_data->msg_info.body_fd = -1; + req_rpc_data->rpc_cancel = 1; nxt_router_app_use(task, app, 1); req_rpc_data->request = r; r->req_rpc_data = req_rpc_data; - req_app_link = &ra_local; - nxt_request_app_link_init(task, req_app_link, req_rpc_data); + if (r->last != NULL) { + r->last->completion_handler = nxt_router_http_request_done; + } - res = nxt_router_app_port(task, app, req_app_link); - req_app_link = req_rpc_data->req_app_link; + nxt_router_app_port_get(task, app, req_rpc_data); + nxt_router_app_prepare_request(task, req_rpc_data); +} - if (res == NXT_OK) { - port = req_app_link->app_port; - nxt_assert(port != NULL); +static void +nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); + r = data; - nxt_router_app_prepare_request(task, req_app_link); + nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); + + if (r->req_rpc_data) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); } - nxt_request_app_link_use(task, req_app_link, -1); + nxt_http_request_close_handler(task, r, r->proto.any); } @@ -4973,76 +4472,80 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) + nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *buf; + nxt_app_t *app; + nxt_buf_t *buf, *body; nxt_int_t res; nxt_port_t *port, *reply_port; - nxt_apr_action_t apr_action; - nxt_assert(req_app_link->app_port != NULL); + app = req_rpc_data->app; - port = req_app_link->app_port; - reply_port = req_app_link->reply_port; + nxt_assert(app != NULL); - apr_action = NXT_APR_REQUEST_FAILED; + port = req_rpc_data->app_port; - buf = nxt_router_prepare_msg(task, req_app_link->request, port, - nxt_app_msg_prefix[port->app->type]); + nxt_assert(port != NULL); + reply_port = task->thread->engine->port; + + buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, + nxt_app_msg_prefix[app->type]); if (nxt_slow_path(buf == NULL)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to prepare message for application"); - goto release_port; + nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", + req_rpc_data->stream, &app->name); + + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); + + return; } nxt_debug(task, "about to send %O bytes buffer to app process port %d", nxt_buf_used_size(buf), port->socket.fd); - apr_action = NXT_APR_NEW_PORT; - - req_app_link->msg_info.buf = buf; - req_app_link->msg_info.completion_handler = buf->completion_handler; + req_rpc_data->msg_info.buf = buf; + req_rpc_data->msg_info.completion_handler = buf->completion_handler; - for (; buf; buf = buf->next) { + do { buf->completion_handler = nxt_router_dummy_buf_completion; - } + buf = buf->next; + } while (buf != NULL); - buf = req_app_link->msg_info.buf; + buf = req_rpc_data->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, &port->process->outgoing, - &req_app_link->msg_info.tracking, - req_app_link->stream); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to get tracking area"); - goto release_port; - } + body = req_rpc_data->request->body; - if (req_app_link->body_fd != -1) { - nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream, - req_app_link->body_fd); + if (body != NULL && nxt_buf_is_file(body)) { + req_rpc_data->msg_info.body_fd = body->file->fd; + + body->file->fd = -1; - lseek(req_app_link->body_fd, 0, SEEK_SET); + } else { + req_rpc_data->msg_info.body_fd = -1; } - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, - req_app_link->body_fd, - req_app_link->stream, reply_port->id, buf, - &req_app_link->msg_info.tracking); + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, + req_rpc_data->msg_info.body_fd); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send message to application"); - goto release_port; + lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); } -release_port: + res = nxt_port_socket_twrite(task, port, + NXT_PORT_MSG_REQ_HEADERS, + req_rpc_data->msg_info.body_fd, + req_rpc_data->stream, reply_port->id, buf, + NULL); - nxt_router_app_port_release(task, port, apr_action); + if (nxt_slow_path(res != NXT_OK)) { + nxt_alert(task, "stream #%uD, app '%V': failed to send app message", + req_rpc_data->stream, &app->name); - nxt_request_app_link_update_peer(task, req_app_link); + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); + } } @@ -5100,7 +4603,7 @@ nxt_fields_next(nxt_fields_iter_t *i) static nxt_buf_t * nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, - nxt_port_t *port, const nxt_str_t *prefix) + nxt_app_t *app, const nxt_str_t *prefix) { void *target_pos, *query_pos; u_char *pos, *end, *p, c; @@ -5141,7 +4644,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, return NULL; } - out = nxt_port_mmap_get_buf(task, &port->process->outgoing, + out = nxt_port_mmap_get_buf(task, &app->outgoing, nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); if (nxt_slow_path(out == NULL)) { return NULL; @@ -5323,8 +4826,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (buf == NULL) { free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, &port->process->outgoing, - free_size); + buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; @@ -5372,15 +4874,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_bool_t cancelled, unlinked; - nxt_port_t *port; nxt_timer_t *timer; - nxt_queue_link_t *lnk; nxt_http_request_t *r; - nxt_request_app_link_t *pending_ra; nxt_request_rpc_data_t *req_rpc_data; - nxt_port_select_state_t state; timer = obj; @@ -5388,94 +4884,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) r = nxt_timer_data(timer, nxt_http_request_t, timer); req_rpc_data = r->timer_data; - app = req_rpc_data->app; - - if (app == NULL) { - goto generate_error; - } - - port = NULL; - pending_ra = NULL; - - if (req_rpc_data->app_port != NULL) { - port = req_rpc_data->app_port; - req_rpc_data->app_port = NULL; - } - - if (port == NULL && req_rpc_data->req_app_link != NULL - && req_rpc_data->req_app_link->app_port != NULL) - { - port = req_rpc_data->req_app_link->app_port; - req_rpc_data->req_app_link->app_port = NULL; - } - - if (port == NULL) { - goto generate_error; - } - - nxt_thread_mutex_lock(&app->mutex); - - unlinked = nxt_queue_chk_remove(&port->app_link); - - if (!nxt_queue_is_empty(&port->pending_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_debug(task, "app '%V' pending request #%uD found", - &app->name, pending_ra->stream); - - cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info, - pending_ra->stream); - - if (cancelled) { - state.req_app_link = pending_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove pending_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(pending_ra); - - nxt_router_port_select(task, &state); - - } else { - pending_ra = NULL; - } - } - - nxt_thread_mutex_unlock(&app->mutex); - - if (pending_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep pending_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, pending_ra); - - } else { - nxt_request_app_link_use(task, pending_ra, -1); - } - } - - nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - - nxt_port_use(task, port, unlinked ? -2 : -1); - -generate_error: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); @@ -5483,13 +4891,11 @@ generate_error: } -static nxt_int_t -nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r) +static void +nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) { r->timer.handler = nxt_router_http_request_release; nxt_timer_add(task->thread->engine, &r->timer, 0); - - return NXT_OK; } @@ -5498,7 +4904,7 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; - nxt_debug(task, "http app release"); + nxt_debug(task, "http request pool release"); r = nxt_timer_data(obj, nxt_http_request_t, timer); @@ -5593,7 +4999,18 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_assert(port->type == NXT_PROCESS_APP); - mmaps = &port->process->outgoing; + if (nxt_slow_path(port->app == NULL)) { + nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", + port->pid, port->id); + + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + + return; + } + + mmaps = &port->app->outgoing; nxt_thread_mutex_lock(&mmaps->mutex); if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { @@ -5602,6 +5019,9 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", (int) get_mmap_msg->id); + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); return; } -- cgit From f4a118f84ae7c8b9e67e2c461087dd0986664574 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:20 +0300 Subject: Adding debug messages to catch process management issues. --- src/nxt_router.c | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 44b303e4..922f15cd 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -437,6 +437,8 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) nxt_port_t *router_port; nxt_runtime_t *rt; + nxt_debug(task, "app '%V' start process", &app->name); + rt = task->thread->runtime; router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; @@ -2267,6 +2269,10 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_insert_tail(&app->ports, &port->app_link); nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + + nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", + &app->name, port->pid, port->id); + nxt_port_hash_add(&app->port_hash, port); app->port_hash_count++; @@ -3690,6 +3696,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, if (nxt_queue_chk_remove(&main_app_port->idle_link)) { app->idle_processes--; + nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", + &app->name, main_app_port->pid, main_app_port->id, + (main_app_port->idle_start ? "idle_ports" : "spare_ports")); + /* Check port was in 'spare_ports' using idle_start field. */ if (main_app_port->idle_start == 0 && app->idle_processes >= app->spare_processes) @@ -3707,6 +3717,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_queue_insert_tail(&app->spare_ports, idle_lnk); idle_port->idle_start = 0; + + nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " + "to spare_ports", + &app->name, idle_port->pid, idle_port->id); } if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { @@ -3925,7 +3939,7 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) nxt_inline nxt_port_t * -nxt_router_app_get_port_for_quit(nxt_app_t *app) +nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) { nxt_port_t *port; @@ -3940,6 +3954,10 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) if (nxt_queue_chk_remove(&port->idle_link)) { app->idle_processes--; + + nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", + &app->name, port->pid, port->id, + (port->idle_start ? "idle_ports" : "spare_ports")); } nxt_port_hash_remove(&app->port_hash, port); @@ -4074,10 +4092,15 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, if (app->idle_processes < app->spare_processes) { nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); + nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", + &app->name, main_app_port->pid, main_app_port->id); } else { nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); main_app_port->idle_start = task->thread->engine->timers.now; + + nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", + &app->name, main_app_port->pid, main_app_port->id); } app->idle_processes++; @@ -4151,6 +4174,10 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) if (nxt_queue_chk_remove(&port->idle_link)) { app->idle_processes--; + nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", + &app->name, port->pid, port->id, + (port->idle_start ? "idle_ports" : "spare_ports")); + if (port->idle_start == 0 && app->idle_processes >= app->spare_processes) { @@ -4163,6 +4190,10 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_queue_insert_tail(&app->spare_ports, idle_lnk); idle_port->idle_start = 0; + + nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " + "to spare_ports", + &app->name, idle_port->pid, idle_port->id); } } @@ -4243,6 +4274,9 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(lnk); lnk->next = NULL; + nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", + &app->name, port->pid, port->id); + nxt_queue_chk_remove(&port->app_link); nxt_port_hash_remove(&app->port_hash, port); @@ -4318,7 +4352,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) app = app_joint->app; for ( ;; ) { - port = nxt_router_app_get_port_for_quit(app); + port = nxt_router_app_get_port_for_quit(task, app); if (port == NULL) { break; } -- cgit From 72475ee11c4254086e5d5648c86498833bf8e939 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:28 +0300 Subject: Made router port message handlers into static functions. Mostly harmless. --- src/nxt_router.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 922f15cd..b8e94bcc 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -69,6 +69,15 @@ static void nxt_router_greet_controller(nxt_task_t *task, static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); +static void nxt_router_new_port_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_router_conf_data_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_router_remove_pid_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_router_access_log_reopen_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -561,7 +570,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, } -void +static void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_app_t *app; @@ -623,7 +632,7 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } -void +static void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { void *p; @@ -708,7 +717,7 @@ nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, } -void +static void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_event_engine_t *engine; @@ -3323,7 +3332,7 @@ typedef struct { } nxt_router_access_log_reopen_t; -void +static void nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_mp_t *mp; -- cgit From e227fc9e6281c280c46139a81646ecd7b0510e2b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:34 +0300 Subject: Introducing application and port shared memory queues. The goal is to minimize the number of syscalls needed to deliver a message. --- src/nxt_router.c | 201 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 179 insertions(+), 22 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index b8e94bcc..3dd0878b 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -15,6 +15,8 @@ #include #include #include +#include +#include typedef struct { nxt_str_t type; @@ -92,6 +94,12 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task, static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); +static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, + nxt_port_t *port, nxt_fd_t fd); static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); static void nxt_router_listen_socket_ready(nxt_task_t *task, @@ -473,21 +481,25 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) nxt_inline nxt_bool_t -nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, - uint32_t stream) +nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *b, *next; - nxt_bool_t cancelled; + nxt_buf_t *b, *next; + nxt_bool_t cancelled; + nxt_msg_info_t *msg_info; + + msg_info = &req_rpc_data->msg_info; if (msg_info->buf == NULL) { return 0; } - cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, - stream); + cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, + msg_info->tracking_cookie, + req_rpc_data->stream); if (cancelled) { - nxt_debug(task, "stream #%uD: cancelled by router", stream); + nxt_debug(task, "stream #%uD: cancelled by router", + req_rpc_data->stream); } for (b = msg_info->buf; b != NULL; b = next) { @@ -529,7 +541,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, { nxt_http_request_t *r; - nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); + nxt_router_msg_cancel(task, req_rpc_data); if (req_rpc_data->app_port != NULL) { nxt_router_app_port_release(task, req_rpc_data->app_port, @@ -573,6 +585,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, static void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_int_t res; nxt_app_t *app; nxt_port_t *port, *main_app_port; nxt_runtime_t *rt; @@ -592,6 +605,17 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; + + } else { + if (msg->fd2 != -1) { + res = nxt_router_port_queue_map(task, port, msg->fd2); + if (nxt_slow_path(res != NXT_OK)) { + return; + } + + nxt_fd_close(msg->fd2); + msg->fd2 = -1; + } } if (msg->port_msg.stream != 0) { @@ -1523,6 +1547,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } + ret = nxt_router_app_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + nxt_port_write_enable(task, port); port->app = app; @@ -1828,6 +1858,82 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) } +static nxt_int_t +nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_app_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_port_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) +{ + void *mem; + + nxt_assert(fd != -1); + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + + return NXT_ERROR; + } + + port->queue = mem; + + return NXT_OK; +} + + void nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, nxt_http_action_t *action) @@ -2748,6 +2854,12 @@ nxt_router_thread_start(void *data) return; } + ret = nxt_router_port_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return; + } + engine->port = port; nxt_port_enable(task, port, &nxt_router_app_port_handlers); @@ -3670,6 +3782,7 @@ static void nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) { + int res; nxt_app_t *app; nxt_bool_t start_process; nxt_port_t *app_port, *main_app_port, *idle_port; @@ -3752,6 +3865,24 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, req_rpc_data->app_port = app_port; + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, + req_rpc_data->msg_info.body_fd); + + lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); + + res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, + req_rpc_data->msg_info.body_fd, + req_rpc_data->stream, + task->thread->engine->port->id, NULL); + + if (nxt_slow_path(res != NXT_OK)) { + r = req_rpc_data->request; + + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + } + } + if (app->timeout != 0) { r = req_rpc_data->request; @@ -3886,10 +4017,10 @@ nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) msg->max_share = port->max_share; msg->type = port->type; - return nxt_port_socket_twrite(task, app_port, + return nxt_port_socket_write2(task, app_port, NXT_PORT_MSG_NEW_PORT, - port->pair[0], - 0, 0, b, NULL); + port->pair[0], port->queue_fd, + 0, 0, b); } @@ -4522,6 +4653,13 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_int_t res; nxt_port_t *port, *reply_port; + int notify; + struct { + nxt_port_msg_t pm; + nxt_port_mmap_msg_t mm; + } msg; + + app = req_rpc_data->app; nxt_assert(app != NULL); @@ -4529,6 +4667,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, port = req_rpc_data->app_port; nxt_assert(port != NULL); + nxt_assert(port->queue != NULL); reply_port = task->thread->engine->port; @@ -4569,20 +4708,38 @@ nxt_router_app_prepare_request(nxt_task_t *task, req_rpc_data->msg_info.body_fd = -1; } - if (req_rpc_data->msg_info.body_fd != -1) { - nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, - req_rpc_data->msg_info.body_fd); + msg.pm.stream = req_rpc_data->stream; + msg.pm.pid = reply_port->pid; + msg.pm.reply_port = reply_port->id; + msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; + msg.pm.last = 0; + msg.pm.mmap = 1; + msg.pm.nf = 0; + msg.pm.mf = 0; + msg.pm.tracking = 0; - lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); - } + nxt_port_mmap_handler_t *mmap_handler = buf->parent; + nxt_port_mmap_header_t *hdr = mmap_handler->hdr; + + msg.mm.mmap_id = hdr->id; + msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); + msg.mm.size = nxt_buf_used_size(buf); - res = nxt_port_socket_twrite(task, port, - NXT_PORT_MSG_REQ_HEADERS, - req_rpc_data->msg_info.body_fd, - req_rpc_data->stream, reply_port->id, buf, - NULL); + res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), + req_rpc_data->stream, ¬ify, + &req_rpc_data->msg_info.tracking_cookie); + if (nxt_fast_path(res == NXT_OK)) { + if (notify != 0) { + (void) nxt_port_socket_write(task, port, + NXT_PORT_MSG_READ_QUEUE, + -1, req_rpc_data->stream, + reply_port->id, NULL); - if (nxt_slow_path(res != NXT_OK)) { + } else { + nxt_debug(task, "queue is not empty"); + } + + } else { nxt_alert(task, "stream #%uD, app '%V': failed to send app message", req_rpc_data->stream, &app->name); -- cgit From f147943f6382c0e90a216615ff9bcf57a3db8c75 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 21:48:27 +0300 Subject: Style fixes for 2 file descriptors transfer over port. Two consecutive fd and fd2 fields replaced with array. --- src/nxt_router.c | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 3dd0878b..df0d96ad 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -607,14 +607,14 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } else { - if (msg->fd2 != -1) { - res = nxt_router_port_queue_map(task, port, msg->fd2); + if (msg->fd[1] != -1) { + res = nxt_router_port_queue_map(task, port, msg->fd[1]); if (nxt_slow_path(res != NXT_OK)) { return; } - nxt_fd_close(msg->fd2); - msg->fd2 = -1; + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; } } @@ -669,7 +669,7 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - if (nxt_slow_path(msg->fd == -1)) { + if (nxt_slow_path(msg->fd[0] == -1)) { nxt_alert(task, "conf_data_handler: invalid file shm fd"); return; } @@ -678,18 +678,18 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", (int) nxt_buf_mem_used_size(&msg->buf->mem)); - nxt_fd_close(msg->fd); - msg->fd = -1; + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; return; } nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); - p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd, 0); + p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); - nxt_fd_close(msg->fd); - msg->fd = -1; + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; if (nxt_slow_path(p == MAP_FAILED)) { return; @@ -2133,7 +2133,7 @@ nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, rpc = data; - s = msg->fd; + s = msg->fd[0]; ret = nxt_socket_nonblocking(task, s); if (nxt_slow_path(ret != NXT_OK)) { @@ -2271,7 +2271,7 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, goto fail; } - tlscf->chain_file = msg->fd; + tlscf->chain_file = msg->fd[0]; ret = task->thread->runtime->tls->server_init(task, tlscf); if (nxt_slow_path(ret != NXT_OK)) { @@ -3392,7 +3392,7 @@ nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, access_log = tmcf->router_conf->access_log; - access_log->fd = msg->fd; + access_log->fd = msg->fd[0]; nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_conf_apply, task, tmcf, NULL); @@ -3541,13 +3541,13 @@ nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (access_log == nxt_router->access_log) { - if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) { + if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { nxt_alert(task, "dup2(%FD, %FD) failed %E", - msg->fd, access_log->fd, nxt_errno); + msg->fd[0], access_log->fd, nxt_errno); } } - nxt_fd_close(msg->fd); + nxt_fd_close(msg->fd[0]); nxt_mp_release(reopen->mem_pool); } -- cgit From 09685e2b4143ec19afef7673a455cf7e4d1414b7 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 12 Aug 2020 15:25:29 +0300 Subject: Responding with error in case of first process start failure. After shared application port introducing, request queue in router was removed and requests may stuck forever waiting for another process start. --- src/nxt_router.c | 142 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 124 insertions(+), 18 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index df0d96ad..0ccf6593 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -204,6 +204,8 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_error(nxt_task_t *task, void *obj, + void *data); static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data); @@ -539,6 +541,8 @@ nxt_inline void nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { + nxt_app_t *app; + nxt_bool_t unlinked; nxt_http_request_t *r; nxt_router_msg_cancel(task, req_rpc_data); @@ -550,12 +554,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, req_rpc_data->app_port = NULL; } - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); - - req_rpc_data->app = NULL; - } - + app = req_rpc_data->app; r = req_rpc_data->request; if (r != NULL) { @@ -565,6 +564,31 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, r->req_rpc_data = NULL; req_rpc_data->request = NULL; + + if (app != NULL) { + unlinked = 0; + + nxt_thread_mutex_lock(&app->mutex); + + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + } + } + + if (app != NULL) { + nxt_router_app_use(task, app, -1); + + req_rpc_data->app = NULL; } if (req_rpc_data->msg_info.body_fd != -1) { @@ -1492,6 +1516,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); + nxt_queue_init(&app->ack_waiting_req); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -3784,7 +3809,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, { int res; nxt_app_t *app; - nxt_bool_t start_process; + nxt_bool_t start_process, unlinked; nxt_port_t *app_port, *main_app_port, *idle_port; nxt_queue_link_t *idle_lnk; nxt_http_request_t *r; @@ -3797,19 +3822,31 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, msg->port_msg.pid); app = req_rpc_data->app; + r = req_rpc_data->request; start_process = 0; + unlinked = 0; nxt_thread_mutex_lock(&app->mutex); + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, msg->port_msg.reply_port); if (nxt_slow_path(app_port == NULL)) { nxt_thread_mutex_unlock(&app->mutex); - r = req_rpc_data->request; nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + return; } @@ -3857,6 +3894,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_thread_mutex_unlock(&app->mutex); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + if (start_process) { nxt_router_start_app_process(task, app); } @@ -3877,15 +3918,11 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, task->thread->engine->port->id, NULL); if (nxt_slow_path(res != NXT_OK)) { - r = req_rpc_data->request; - nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); } } if (app->timeout != 0) { - r = req_rpc_data->request; - r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; nxt_timer_add(task->thread->engine, &r->timer, app->timeout); @@ -4028,8 +4065,10 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *link; + nxt_http_request_t *r; app_joint = data; @@ -4047,15 +4086,43 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p start error", &app->name, app); + link = NULL; + nxt_thread_mutex_lock(&app->mutex); nxt_assert(app->pending_processes != 0); app->pending_processes--; + if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + nxt_thread_mutex_unlock(&app->mutex); - /* TODO req_app_link to cancel first pending message */ + while (link != NULL) { + r = nxt_container_of(link, nxt_http_request_t, app_link); + + nxt_event_engine_post(r->engine, &r->err_work); + + link = NULL; + + nxt_thread_mutex_lock(&app->mutex); + + if (app->processes == 0 && app->pending_processes == 0 + && !nxt_queue_is_empty(&app->ack_waiting_req)) + { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); + } } @@ -4541,8 +4608,9 @@ static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data) { - nxt_bool_t start_process; - nxt_port_t *port; + nxt_bool_t start_process; + nxt_port_t *port; + nxt_http_request_t *r; start_process = 0; @@ -4558,8 +4626,22 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, start_process = 1; } + r = req_rpc_data->request; + + /* + * Put request into application-wide list to be able to cancel request + * if something goes wrong with application processes. + */ + nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); + nxt_thread_mutex_unlock(&app->mutex); + /* + * Retain request memory pool while request is linked in ack_waiting_req + * to guarantee request structure memory is accessble. + */ + nxt_mp_retain(r->mem_pool); + req_rpc_data->app_port = port; req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; @@ -4602,6 +4684,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, r->timer.log = engine->task.log; r->timer.bias = NXT_TIMER_DEFAULT_BIAS; + r->engine = engine; + r->err_work.handler = nxt_router_http_request_error; + r->err_work.task = task; + r->err_work.obj = r; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; req_rpc_data->msg_info.body_fd = -1; @@ -4621,6 +4708,25 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, } +static void +nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = obj; + + nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); + + nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); + + if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); + } + + nxt_mp_release(r->mem_pool); +} + + static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) { @@ -4630,7 +4736,7 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); - if (r->req_rpc_data) { + if (r->req_rpc_data != NULL) { nxt_request_rpc_data_unlink(task, r->req_rpc_data); } -- cgit From 93146616cf56a94fc2979cb978c7b451c5592594 Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Thu, 13 Aug 2020 02:46:54 +0300 Subject: Basic variables support. --- src/nxt_router.c | 183 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 139 insertions(+), 44 deletions(-) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 0ccf6593..1318eeb4 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -93,7 +93,16 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); + static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); +static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); +static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, + nxt_app_t *app); +static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, + nxt_str_t *name); +static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, + int i); + static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port); static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, @@ -198,6 +207,7 @@ static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, @@ -954,6 +964,8 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) nxt_router_apps_sort(task, router, tmcf); + nxt_router_apps_hash_use(task, rtcf, 1); + nxt_router_engines_post(router, tmcf); nxt_queue_add(&router->sockets, &updating_sockets); @@ -1012,7 +1024,7 @@ nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_debug(task, "rtcf %p: %D", rtcf, count); if (count == 0) { - nxt_http_routes_cleanup(task, rtcf->routes); + nxt_router_apps_hash_use(task, rtcf, -1); nxt_router_access_log_release(task, lock, rtcf->access_log); @@ -1057,16 +1069,6 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) rtcf = tmcf->router_conf; - nxt_http_routes_cleanup(task, rtcf->routes); - - nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { - - if (skcf->action != NULL) { - nxt_http_action_cleanup(task, skcf->action); - } - - } nxt_queue_loop; - nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { nxt_router_app_unlink(task, app); @@ -1406,6 +1408,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_remove(&prev->link); nxt_queue_insert_tail(&tmcf->previous, &prev->link); + + ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + continue; } @@ -1543,6 +1551,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_insert_tail(&tmcf->apps, &app->link); + ret = nxt_router_apps_hash_add(tmcf->router_conf, app); + if (nxt_slow_path(ret != NXT_OK)) { + goto app_fail; + } + nxt_router_app_use(task, app, 1); app->joint = app_joint; @@ -1717,7 +1730,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, /* COMPATIBILITY: listener application. */ } else if (lscf.application.length > 0) { - skcf->action = nxt_http_pass_application(task, tmcf, + skcf->action = nxt_http_pass_application(task, + tmcf->router_conf, &lscf.application); } } @@ -1959,20 +1973,106 @@ nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) } -void -nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, +static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_router_apps_hash_test, + nxt_mp_lvlhsh_alloc, + nxt_mp_lvlhsh_free, +}; + + +static nxt_int_t +nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_app_t *app; + + app = data; + + return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; +} + + +static nxt_int_t +nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); + lhq.replace = 0; + lhq.key = app->name; + lhq.value = app; + lhq.proto = &nxt_router_apps_hash_proto; + lhq.pool = rtcf->mem_pool; + + switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { + + case NXT_OK: + return NXT_OK; + + case NXT_DECLINED: + nxt_thread_log_alert("router app hash adding failed: " + "\"%V\" is already in hash", &lhq.key); + /* Fall through. */ + default: + return NXT_ERROR; + } +} + + +static nxt_app_t * +nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_djb_hash(name->start, name->length); + lhq.key = *name; + lhq.proto = &nxt_router_apps_hash_proto; + + if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { + return NULL; + } + + return lhq.value; +} + + +static void +nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) +{ + nxt_app_t *app; + nxt_lvlhsh_each_t lhe; + + nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); + + for ( ;; ) { + app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); + + if (app == NULL) { + break; + } + + nxt_router_app_use(task, app, i); + } +} + + + +nxt_int_t +nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name, nxt_http_action_t *action) { nxt_app_t *app; - app = nxt_router_app_find(&tmcf->apps, name); + app = nxt_router_apps_hash_get(rtcf, name); if (app == NULL) { - app = nxt_router_app_find(&tmcf->previous, name); + return NXT_DECLINED; } action->u.application = app; action->handler = nxt_http_application_handler; + + return NXT_OK; } @@ -3201,24 +3301,18 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_unlock(lock); - if (skcf != NULL) { - if (skcf->action != NULL) { - nxt_http_action_cleanup(task, skcf->action); - } - #if (NXT_TLS) - if (skcf->tls != NULL) { - task->thread->runtime->tls->server_free(task, skcf->tls); - } -#endif + if (skcf != NULL && skcf->tls != NULL) { + task->thread->runtime->tls->server_free(task, skcf->tls); } +#endif /* TODO remove engine->port */ if (rtcf != NULL) { nxt_debug(task, "old router conf is destroyed"); - nxt_http_routes_cleanup(task, rtcf->routes); + nxt_router_apps_hash_use(task, rtcf, -1); nxt_router_access_log_release(task, lock, rtcf->access_log); @@ -4126,24 +4220,6 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, } -void -nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) -{ - int c; - - c = nxt_atomic_fetch_add(&app->use_count, i); - - if (i < 0 && c == -i) { - - if (task->thread->engine != app->engine) { - nxt_event_engine_post(app->engine, &app->joint->free_app_work); - - } else { - nxt_router_free_app(task, app->joint, NULL); - } - } -} - nxt_inline nxt_port_t * nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) @@ -4183,6 +4259,25 @@ nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) } +static void +nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) +{ + int c; + + c = nxt_atomic_fetch_add(&app->use_count, i); + + if (i < 0 && c == -i) { + + if (task->thread->engine != app->engine) { + nxt_event_engine_post(app->engine, &app->joint->free_app_work); + + } else { + nxt_router_free_app(task, app->joint, NULL); + } + } +} + + static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) { -- cgit From b04b5ce430ef055a7552b9fc451ca23f7d5effb3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 13 Aug 2020 16:08:38 +0300 Subject: Fixing router assertion in result of application prefork error. Buffer for application prefork request allocated from temp conf mem_pool. If error response from main process received before buffer completion handler, temp conf mem_pool destroyed and router may crash in completion handler. Assertion "src/nxt_buf.c:208 assertion failed: data == b->parent" triggered when NXT_DEBUG_ALLOC enabled in configure. This patch disables completion handler and memory allocated for buffer released with memory pool. --- src/nxt_router.c | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/nxt_router.c') diff --git a/src/nxt_router.c b/src/nxt_router.c index 1318eeb4..0e1de6fa 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -219,6 +219,8 @@ static void nxt_router_http_request_error(nxt_task_t *task, void *obj, static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data); +static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, + void *data); static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, @@ -2218,6 +2220,8 @@ nxt_router_listen_socket_rpc_create(nxt_task_t *task, goto fail; } + b->completion_handler = nxt_router_dummy_buf_completion; + b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); rt = task->thread->runtime; @@ -2446,6 +2450,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, goto fail; } + b->completion_handler = nxt_router_dummy_buf_completion; + nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); @@ -3469,6 +3475,8 @@ nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) goto fail; } + b->completion_handler = nxt_router_dummy_buf_completion; + nxt_buf_cpystr(b, &access_log->path); *b->mem.free++ = '\0'; -- cgit