From 50f9816daab6c1f9f51764bd488cea0dd11ce965 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 12 May 2020 16:25:16 +0300 Subject: Blocking config change when applying the initial router config. --- src/nxt_controller.c | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) (limited to 'src/nxt_controller.c') diff --git a/src/nxt_controller.c b/src/nxt_controller.c index f9b2cf26..d17b0cc6 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -47,6 +47,7 @@ static void nxt_controller_router_ready_handler(nxt_task_t *task, static nxt_int_t nxt_controller_conf_default(void); static void nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static void nxt_controller_flush_requests(nxt_task_t *task); static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); @@ -103,6 +104,7 @@ static nxt_uint_t nxt_controller_listening; static nxt_uint_t nxt_controller_router_ready; static nxt_controller_conf_t nxt_controller_conf; static nxt_queue_t nxt_controller_waiting_requests; +static nxt_bool_t nxt_controller_waiting_init_conf; static const nxt_event_conn_state_t nxt_controller_conn_read_state; @@ -245,6 +247,8 @@ nxt_controller_send_current_conf(nxt_task_t *task) nxt_controller_conf_init_handler, NULL); if (nxt_fast_path(rc == NXT_OK)) { + nxt_controller_waiting_init_conf = 1; + return; } @@ -322,6 +326,8 @@ nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, { nxt_runtime_t *rt; + nxt_controller_waiting_init_conf = 0; + if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { nxt_alert(task, "failed to apply previous configuration"); @@ -343,6 +349,25 @@ nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_controller_listening = 1; } + + nxt_controller_flush_requests(task); +} + + +static void +nxt_controller_flush_requests(nxt_task_t *task) +{ + nxt_queue_t queue; + nxt_controller_request_t *req; + + nxt_queue_init(&queue); + nxt_queue_add(&queue, &nxt_controller_waiting_requests); + + nxt_queue_init(&nxt_controller_waiting_requests); + + nxt_queue_each(req, &queue, nxt_controller_request_t, link) { + nxt_controller_process_request(task, req); + } nxt_queue_loop; } @@ -961,7 +986,9 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, if (post || nxt_str_eq(&req->parser.method, "PUT", 3)) { - if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { + if (!nxt_queue_is_empty(&nxt_controller_waiting_requests) + || nxt_controller_waiting_init_conf) + { nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); return; } @@ -1076,7 +1103,9 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { - if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { + if (!nxt_queue_is_empty(&nxt_controller_waiting_requests) + || nxt_controller_waiting_init_conf) + { nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); return; } @@ -1469,7 +1498,6 @@ static void nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_queue_t queue; nxt_controller_request_t *req; nxt_controller_response_t resp; @@ -1502,14 +1530,7 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_controller_response(task, req, &resp); - nxt_queue_init(&queue); - nxt_queue_add(&queue, &nxt_controller_waiting_requests); - - nxt_queue_init(&nxt_controller_waiting_requests); - - nxt_queue_each(req, &queue, nxt_controller_request_t, link) { - nxt_controller_process_request(task, req); - } nxt_queue_loop; + nxt_controller_flush_requests(task); } -- cgit From 3ec72362b93432ddd8f8fb06f8dbc0bcd8a7e95d Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 12 May 2020 16:25:24 +0300 Subject: Waiting for router instead of reporting to user on config update. --- src/nxt_controller.c | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) (limited to 'src/nxt_controller.c') diff --git a/src/nxt_controller.c b/src/nxt_controller.c index d17b0cc6..ea70cf78 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -76,6 +76,7 @@ static void nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req); static void nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, nxt_str_t *path); +static nxt_bool_t nxt_controller_check_postpone_request(nxt_task_t *task); #if (NXT_TLS) static void nxt_controller_process_cert(nxt_task_t *task, nxt_controller_request_t *req, nxt_str_t *path); @@ -270,6 +271,8 @@ nxt_controller_send_current_conf(nxt_task_t *task) } nxt_controller_listening = 1; + + nxt_controller_flush_requests(task); } @@ -386,9 +389,8 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) { - return NXT_DECLINED; - } + nxt_assert(router_port != NULL); + nxt_assert(nxt_controller_router_ready); controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; @@ -986,9 +988,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, if (post || nxt_str_eq(&req->parser.method, "PUT", 3)) { - if (!nxt_queue_is_empty(&nxt_controller_waiting_requests) - || nxt_controller_waiting_init_conf) - { + if (nxt_controller_check_postpone_request(task)) { nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); return; } @@ -1085,10 +1085,6 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, if (nxt_slow_path(rc != NXT_OK)) { nxt_mp_destroy(mp); - if (rc == NXT_DECLINED) { - goto no_router; - } - /* rc == NXT_ERROR */ goto alloc_fail; } @@ -1103,9 +1099,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { - if (!nxt_queue_is_empty(&nxt_controller_waiting_requests) - || nxt_controller_waiting_init_conf) - { + if (nxt_controller_check_postpone_request(task)) { nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); return; } @@ -1172,10 +1166,6 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, if (nxt_slow_path(rc != NXT_OK)) { nxt_mp_destroy(mp); - if (rc == NXT_DECLINED) { - goto no_router; - } - /* rc == NXT_ERROR */ goto alloc_fail; } @@ -1222,16 +1212,27 @@ alloc_fail: resp.offset = -1; nxt_controller_response(task, req, &resp); - return; +} -no_router: - resp.status = 500; - resp.title = (u_char *) "Router process isn't available."; - resp.offset = -1; +static nxt_bool_t +nxt_controller_check_postpone_request(nxt_task_t *task) +{ + nxt_port_t *router_port; + nxt_runtime_t *rt; - nxt_controller_response(task, req, &resp); - return; + if (!nxt_queue_is_empty(&nxt_controller_waiting_requests) + || nxt_controller_waiting_init_conf + || !nxt_controller_router_ready) + { + return 1; + } + + rt = task->thread->runtime; + + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + return (router_port == NULL); } -- cgit From e9e5ddd5a5d9ce99768833137eac2551a710becf Mon Sep 17 00:00:00 2001 From: Tiago Natel de Moura Date: Mon, 9 Mar 2020 16:28:25 +0000 Subject: Refactor of process management. The process abstraction has changed to: setup(task, process) start(task, process_data) prefork(task, process, mp) The prefork() occurs in the main process right before fork. The file src/nxt_main_process.c is completely free of process specific logic. The creation of a process now supports a PROCESS_CREATED state. The The setup() function of each process can set its state to either created or ready. If created, a MSG_PROCESS_CREATED is sent to main process, where external setup can be done (required for rootfs under container). The core processes (discovery, controller and router) doesn't need external setup, then they all proceeds to their start() function straight away. In the case of applications, the load of the module happens at the process setup() time and The module's init() function has changed to be the start() of the process. The module API has changed to: setup(task, process, conf) start(task, data) As a direct benefit of the PROCESS_CREATED message, the clone(2) of processes using pid namespaces now doesn't need to create a pipe to make the child block until parent setup uid/gid mappings nor it needs to receive the child pid. --- src/nxt_controller.c | 142 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 132 insertions(+), 10 deletions(-) (limited to 'src/nxt_controller.c') diff --git a/src/nxt_controller.c b/src/nxt_controller.c index ea70cf78..a61c127d 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -39,11 +39,17 @@ typedef struct { } nxt_controller_response_t; +static nxt_int_t nxt_controller_prefork(nxt_task_t *task, + nxt_process_t *process, nxt_mp_t *mp); +static nxt_int_t nxt_controller_start(nxt_task_t *task, + nxt_process_data_t *data); static void nxt_controller_process_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_controller_send_current_conf(nxt_task_t *task); static void nxt_controller_router_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_controller_remove_pid_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static nxt_int_t nxt_controller_conf_default(void); static void nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -83,6 +89,8 @@ static void nxt_controller_process_cert(nxt_task_t *task, static void nxt_controller_process_cert_save(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name); +static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, + void *data); #endif static void nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -114,21 +122,117 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state; static const nxt_event_conn_state_t nxt_controller_conn_close_state; -nxt_port_handlers_t nxt_controller_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, +static const nxt_port_handlers_t nxt_controller_process_port_handlers = { + .quit = nxt_signal_quit_handler, .new_port = nxt_controller_process_new_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, .process_ready = nxt_controller_router_ready_handler, .data = nxt_port_data_handler, - .remove_pid = nxt_port_remove_pid_handler, + .remove_pid = nxt_controller_remove_pid_handler, .rpc_ready = nxt_port_rpc_handler, .rpc_error = nxt_port_rpc_handler, }; -nxt_int_t -nxt_controller_start(nxt_task_t *task, void *data) +const nxt_process_init_t nxt_controller_process = { + .name = "controller", + .type = NXT_PROCESS_CONTROLLER, + .prefork = nxt_controller_prefork, + .restart = 1, + .setup = nxt_process_core_setup, + .start = nxt_controller_start, + .port_handlers = &nxt_controller_process_port_handlers, + .signals = nxt_process_signals, +}; + + +static nxt_int_t +nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) +{ + ssize_t n; + nxt_int_t ret; + nxt_str_t *conf; + nxt_file_t file; + nxt_runtime_t *rt; + nxt_file_info_t fi; + nxt_controller_init_t ctrl_init; + + nxt_log(task, NXT_LOG_INFO, "controller started"); + + rt = task->thread->runtime; + + nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); + + conf = &ctrl_init.conf; + + nxt_memzero(&file, sizeof(nxt_file_t)); + + file.name = (nxt_file_name_t *) rt->conf; + + ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); + + if (ret == NXT_OK) { + ret = nxt_file_info(&file, &fi); + + if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) { + conf->length = nxt_file_size(&fi); + conf->start = nxt_mp_alloc(mp, conf->length); + if (nxt_slow_path(conf->start == NULL)) { + nxt_file_close(task, &file); + return NXT_ERROR; + } + + n = nxt_file_read(&file, conf->start, conf->length, 0); + + if (nxt_slow_path(n != (ssize_t) conf->length)) { + conf->start = NULL; + conf->length = 0; + + nxt_alert(task, "failed to restore previous configuration: " + "cannot read the file"); + } + } + + nxt_file_close(task, &file); + } + +#if (NXT_TLS) + ctrl_init.certs = nxt_cert_store_load(task, mp); + + nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt); +#endif + + process->data.controller = ctrl_init; + + return NXT_OK; +} + + +#if (NXT_TLS) + +static void +nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data) +{ + pid_t main_pid; + nxt_array_t *certs; + nxt_runtime_t *rt; + + certs = obj; + rt = data; + + main_pid = rt->port_by_type[NXT_PROCESS_MAIN]->pid; + + if (nxt_pid == main_pid && certs != NULL) { + nxt_cert_store_release(certs); + } +} + +#endif + + +static nxt_int_t +nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data) { nxt_mp_t *mp; nxt_int_t ret; @@ -147,15 +251,13 @@ nxt_controller_start(nxt_task_t *task, void *data) nxt_queue_init(&nxt_controller_waiting_requests); - init = data; + init = &data->controller; #if (NXT_TLS) - if (init->certs != NULL) { nxt_cert_info_init(task, init->certs); nxt_cert_store_release(init->certs); } - #endif json = &init->conf; @@ -170,8 +272,6 @@ nxt_controller_start(nxt_task_t *task, void *data) } conf = nxt_conf_json_parse_str(mp, json); - nxt_free(json->start); - if (nxt_slow_path(conf == NULL)) { nxt_alert(task, "failed to restore previous configuration: " "file is corrupted or not enough memory"); @@ -295,6 +395,28 @@ nxt_controller_router_ready_handler(nxt_task_t *task, } +static void +nxt_controller_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t pid; + nxt_process_t *process; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + nxt_assert(nxt_buf_used_size(msg->buf) == sizeof(pid)); + + nxt_memcpy(&pid, msg->buf->mem.pos, sizeof(pid)); + + process = nxt_runtime_process_find(rt, pid); + if (process != NULL && nxt_process_type(process) == NXT_PROCESS_ROUTER) { + nxt_controller_router_ready = 0; + } + + nxt_port_remove_pid_handler(task, msg); +} + + static nxt_int_t nxt_controller_conf_default(void) { -- cgit