From 5fa5b1464f2a0623bb7ee7c68ff6f9d18e744ed4 Mon Sep 17 00:00:00 2001 From: Zhidao HONG Date: Sat, 9 Oct 2021 10:44:31 +0800 Subject: Configuration: automatic migration to the new "share" behavior. --- src/nxt_main_process.c | 65 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 20 deletions(-) (limited to 'src/nxt_main_process.c') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 16c6a297..559789df 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -63,6 +63,8 @@ static void nxt_main_port_modules_handler(nxt_task_t *task, static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name, + const char *name, u_char *buf, size_t size); static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -77,6 +79,8 @@ const nxt_sig_event_t nxt_main_process_signals[] = { }; +nxt_uint_t nxt_conf_ver; + static nxt_bool_t nxt_exiting; @@ -1419,11 +1423,10 @@ static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { void *p; - size_t size; - ssize_t n; + size_t n, size; nxt_int_t ret; - nxt_file_t file; nxt_runtime_t *rt; + u_char ver[NXT_INT_T_LEN]; p = MAP_FAILED; @@ -1457,29 +1460,20 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); - nxt_memzero(&file, sizeof(nxt_file_t)); - rt = task->thread->runtime; - file.name = (nxt_file_name_t *) rt->conf_tmp; + if (nxt_conf_ver != NXT_VERNUM) { + n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver; - if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, - NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) - != NXT_OK)) - { - goto error; - } - - n = nxt_file_write(&file, p, size, 0); - - nxt_file_close(task, &file); + ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n); + if (nxt_slow_path(ret != NXT_OK)) { + goto error; + } - if (nxt_slow_path(n != (ssize_t) size)) { - (void) nxt_file_delete(file.name); - goto error; + nxt_conf_ver = NXT_VERNUM; } - ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); + ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size); if (nxt_fast_path(ret == NXT_OK)) { goto cleanup; @@ -1502,6 +1496,37 @@ cleanup: } +static nxt_int_t +nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name, + u_char *buf, size_t size) +{ + ssize_t n; + nxt_int_t ret; + nxt_file_t file; + + nxt_memzero(&file, sizeof(nxt_file_t)); + + file.name = (nxt_file_name_t *) name; + + ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE, + NXT_FILE_OWNER_ACCESS); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + n = nxt_file_write(&file, buf, size, 0); + + nxt_file_close(task, &file); + + if (nxt_slow_path(n != (ssize_t) size)) { + (void) nxt_file_delete(file.name); + return NXT_ERROR; + } + + return nxt_file_rename(file.name, (nxt_file_name_t *) name); +} + + static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { -- cgit From 78a4063063b8f810a616d74a358ef150faff96db Mon Sep 17 00:00:00 2001 From: Zhidao HONG Date: Tue, 12 Oct 2021 10:32:17 +0800 Subject: Removed unused declarations. Declarations became unused after 6976d36be926. No functional changes. --- src/nxt_main_process.c | 4 ---- 1 file changed, 4 deletions(-) (limited to 'src/nxt_main_process.c') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 559789df..86425dab 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -31,10 +31,6 @@ typedef struct { } nxt_conf_app_map_t; -extern nxt_port_handlers_t nxt_controller_process_port_handlers; -extern nxt_port_handlers_t nxt_router_process_port_handlers; - - static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); -- cgit From bba97134e983541e94cf73e93900729e3a3e61fc Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 28 Oct 2021 17:46:54 +0300 Subject: Moving request limit control to libunit. Introducting application graceful stop. For now only used when application process reach request limit value. This closes #585 issue on GitHub. --- src/nxt_main_process.c | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'src/nxt_main_process.c') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 86425dab..fe21aeb5 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -154,6 +154,12 @@ static nxt_conf_map_t nxt_common_app_limits_conf[] = { offsetof(nxt_common_app_conf_t, shm_limit), }, + { + nxt_string("requests"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, request_limit), + }, + }; @@ -392,6 +398,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) *p = '\0'; app_conf->shm_limit = 100 * 1024 * 1024; + app_conf->request_limit = 0; start += app_conf->name.length + 1; -- cgit From ff6a7053f500414dc74568a4e49adbac7f0cf634 Mon Sep 17 00:00:00 2001 From: Tiago Natel de Moura Date: Tue, 9 Nov 2021 15:48:44 +0300 Subject: Introduced SCM_CREDENTIALS / SCM_CREDS in the socket control msgs. --- src/nxt_main_process.c | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) (limited to 'src/nxt_main_process.c') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index fe21aeb5..3ca46202 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -355,6 +355,19 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; + port = rt->port_by_type[NXT_PROCESS_ROUTER]; + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "router port not found"); + return; + } + + if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) { + nxt_alert(task, "process %PI cannot start processes", + nxt_recv_msg_cmsg_pid(msg)); + + return; + } + process = nxt_main_process_new(task, rt); if (nxt_slow_path(process == NULL)) { return; @@ -1023,6 +1036,13 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) { + nxt_alert(task, "process %PI cannot create listener sockets", + msg->port_msg.pid); + + return; + } + b = msg->buf; sa = (nxt_sockaddr_t *) b->mem.pos; @@ -1266,6 +1286,7 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { + nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid); return; } @@ -1428,9 +1449,19 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void *p; size_t n, size; nxt_int_t ret; + nxt_port_t *ctl_port; nxt_runtime_t *rt; u_char ver[NXT_INT_T_LEN]; + rt = task->thread->runtime; + + ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + + if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) { + nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid); + return; + } + p = MAP_FAILED; /* @@ -1463,8 +1494,6 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); - rt = task->thread->runtime; - if (nxt_conf_ver != NXT_VERNUM) { n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver; -- cgit From 1de660b6df93c09719361e364211c7c6388c01ce Mon Sep 17 00:00:00 2001 From: Tiago Natel de Moura Date: Tue, 9 Nov 2021 15:48:44 +0300 Subject: Changed nxt_process_* for reuse. This enables the reuse of process creation functions. --- src/nxt_main_process.c | 271 ++++++++++--------------------------------------- 1 file changed, 52 insertions(+), 219 deletions(-) (limited to 'src/nxt_main_process.c') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 3ca46202..eff96e14 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -34,11 +34,6 @@ typedef struct { static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); -static nxt_int_t nxt_main_process_create(nxt_task_t *task, - const nxt_process_init_t init); -static nxt_int_t nxt_main_start_process(nxt_task_t *task, - nxt_process_t *process); -static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, @@ -49,7 +44,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); +static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process); static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, @@ -97,7 +92,7 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, * nxt_main_port_modules_handler() which starts the controller * and router processes. */ - return nxt_main_process_create(task, nxt_discovery_process); + return nxt_process_init_start(task, nxt_discovery_process); } @@ -368,11 +363,17 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - process = nxt_main_process_new(task, rt); + process = nxt_process_new(rt); if (nxt_slow_path(process == NULL)) { return; } + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (process->mem_pool == NULL) { + nxt_process_use(task, process, -1); + return; + } + init = nxt_process_init(process); *init = nxt_app_process; @@ -475,7 +476,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) process->stream = msg->port_msg.stream; process->data.app = app_conf; - ret = nxt_main_start_process(task, process); + ret = nxt_process_start(task, process); if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { return; } @@ -617,139 +618,6 @@ nxt_main_process_title(nxt_task_t *task) } -static nxt_int_t -nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) -{ - nxt_int_t ret; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t *pinit; - - rt = task->thread->runtime; - - process = nxt_main_process_new(task, rt); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - process->name = init.name; - process->user_cred = &rt->user_cred; - - pinit = nxt_process_init(process); - *pinit = init; - - ret = nxt_main_start_process(task, process); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_process_use(task, process, -1); - } - - return ret; -} - - -static nxt_process_t * -nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_process_t *process; - - process = nxt_runtime_process_new(rt); - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - process->mem_pool = nxt_mp_create(1024, 128, 256, 32); - if (process->mem_pool == NULL) { - nxt_process_use(task, process, -1); - return NULL; - } - - return process; -} - - -static nxt_int_t -nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) -{ - nxt_mp_t *tmp_mp; - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; - nxt_process_init_t *init; - - init = nxt_process_init(process); - - port = nxt_port_new(task, 0, 0, init->type); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - nxt_process_port_add(task, process, port); - - ret = nxt_port_socket_init(task, port, 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_port; - } - - tmp_mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(tmp_mp == NULL)) { - ret = NXT_ERROR; - - goto close_port; - } - - if (init->prefork) { - ret = init->prefork(task, process, tmp_mp); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_mempool; - } - } - - pid = nxt_process_create(task, process); - - switch (pid) { - - case -1: - ret = NXT_ERROR; - break; - - case 0: - /* The child process: return to the event engine work queue loop. */ - - nxt_process_use(task, process, -1); - - ret = NXT_AGAIN; - break; - - default: - /* The main process created a new process. */ - - nxt_process_use(task, process, -1); - - nxt_port_read_close(port); - nxt_port_write_enable(task, port); - - ret = NXT_OK; - break; - } - -free_mempool: - - nxt_mp_destroy(tmp_mp); - -close_port: - - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_port_close(task, port); - } - -free_port: - - nxt_port_use(task, port, -1); - - return ret; -} - - static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) { @@ -879,13 +747,19 @@ fail: static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) { - int status; - nxt_err_t err; - nxt_pid_t pid; + int status; + nxt_int_t ret; + nxt_err_t err; + nxt_pid_t pid; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t init; nxt_debug(task, "sigchld handler signo:%d (%s)", (int) (uintptr_t) obj, data); + rt = task->thread->runtime; + for ( ;; ) { pid = waitpid(-1, &status, WNOHANG); @@ -926,7 +800,36 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) pid, WEXITSTATUS(status)); } - nxt_main_cleanup_process(task, pid); + process = nxt_runtime_process_find(rt, pid); + + if (process != NULL) { + nxt_main_process_cleanup(task, process); + + if (process->state == NXT_PROCESS_STATE_READY) { + process->stream = 0; + } + + if (nxt_exiting) { + if (rt->nprocesses <= 1) { + nxt_runtime_quit(task, 0); + } + + return; + } + + nxt_port_remove_notify_others(task, process); + + init = *(nxt_process_init_t *) nxt_process_init(process); + + nxt_process_close_ports(task, process); + + if (init.restart) { + ret = nxt_process_init_start(task, init); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_alert(task, "failed to restart %s", init.name); + } + } + } } } @@ -940,81 +843,11 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) static void -nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) +nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process) { - int stream; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_port_t *port; - const char *name; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t init; - - rt = task->thread->runtime; - - process = nxt_runtime_process_find(rt, pid); - if (!process) { - return; - } - if (process->isolation.cleanup != NULL) { process->isolation.cleanup(task, process); } - - name = process->name; - stream = process->stream; - init = *((nxt_process_init_t *) nxt_process_init(process)); - - if (process->state == NXT_PROCESS_STATE_READY) { - process->stream = 0; - } - - nxt_process_close_ports(task, process); - - if (nxt_exiting) { - if (rt->nprocesses <= 1) { - nxt_runtime_quit(task, 0); - } - - return; - } - - nxt_runtime_process_each(rt, process) { - - if (process->pid == nxt_pid - || process->pid == pid - || nxt_queue_is_empty(&process->ports)) - { - continue; - } - - port = nxt_process_port_first(process); - - if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { - continue; - } - - buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(pid)); - - if (nxt_slow_path(buf == NULL)) { - continue; - } - - buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, - stream, 0, buf); - - } nxt_runtime_process_loop; - - if (init.restart) { - ret = nxt_main_process_create(task, init); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_alert(task, "failed to restart %s", name); - } - } } @@ -1407,9 +1240,9 @@ fail: nxt_mp_destroy(mp); - ret = nxt_main_process_create(task, nxt_controller_process); + ret = nxt_process_init_start(task, nxt_controller_process); if (ret == NXT_OK) { - ret = nxt_main_process_create(task, nxt_router_process); + ret = nxt_process_init_start(task, nxt_router_process); } if (nxt_slow_path(ret == NXT_ERROR)) { -- cgit From e207415a78ae67b937faf7e5bcd6e5192993180a Mon Sep 17 00:00:00 2001 From: Tiago Natel de Moura Date: Tue, 9 Nov 2021 15:48:44 +0300 Subject: Introducing application prototype processes. --- src/nxt_main_process.c | 183 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 165 insertions(+), 18 deletions(-) (limited to 'src/nxt_main_process.c') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index eff96e14..a5a20d3d 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -10,6 +10,7 @@ #include #include #include +#include #if (NXT_TLS) #include #endif @@ -52,6 +53,8 @@ static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, static void nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); +static void nxt_main_process_whoami_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name, @@ -326,7 +329,7 @@ static nxt_conf_app_map_t nxt_app_maps[] = { static void -nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "main data: %*s", nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); @@ -334,7 +337,33 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + void *mem; + nxt_port_t *port; + + nxt_port_new_port_handler(task, msg); + + port = msg->u.new_port; + + if (port != NULL + && port->type == NXT_PROCESS_APP + && msg->fd[1] != -1) + { + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0); + if (nxt_fast_path(mem != MAP_FAILED)) { + port->queue = mem; + } + + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; + } +} + + +static void +nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { u_char *start, *p, ch; size_t type_len; @@ -374,16 +403,18 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN]; + init = nxt_process_init(process); - *init = nxt_app_process; + *init = nxt_proto_process; b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); if (b == NULL) { goto failed; } - nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, + nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos, b->mem.pos); app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); @@ -399,7 +430,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) init->name = (const char *) start; process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length - + sizeof("\"\" application") + 1); + + sizeof("\"\" prototype") + 1); if (nxt_slow_path(process->name == NULL)) { goto failed; @@ -408,7 +439,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) p = (u_char *) process->name; *p++ = '"'; p = nxt_cpymem(p, init->name, app_conf->name.length); - p = nxt_cpymem(p, "\" application", 13); + p = nxt_cpymem(p, "\" prototype", 11); *p = '\0'; app_conf->shm_limit = 100 * 1024 * 1024; @@ -504,21 +535,17 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_runtime_process_find(rt, msg->port_msg.pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); - port = nxt_runtime_port_find(rt, msg->port_msg.pid, msg->port_msg.reply_port); - - if (nxt_slow_path(port == NULL)) { return; } + process = port->process; + + nxt_assert(process != NULL); + nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); + #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, @@ -542,10 +569,13 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_port_handlers_t nxt_main_process_port_handlers = { - .data = nxt_port_main_data_handler, + .data = nxt_main_data_handler, + .new_port = nxt_main_new_port_handler, .process_created = nxt_main_process_created_handler, .process_ready = nxt_port_process_ready_handler, - .start_process = nxt_port_main_start_process_handler, + .whoami = nxt_main_process_whoami_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_main_start_process_handler, .socket = nxt_main_port_socket_handler, .modules = nxt_main_port_modules_handler, .conf_store = nxt_main_port_conf_store_handler, @@ -559,6 +589,88 @@ static nxt_port_handlers_t nxt_main_process_port_handlers = { }; +static void +nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *buf; + nxt_pid_t pid, ppid; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *pprocess; + + nxt_assert(msg->port_msg.reply_port == 0); + + if (nxt_slow_path(msg->buf == NULL + || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t))) + { + nxt_alert(task, "whoami: buffer is NULL or unexpected size"); + goto fail; + } + + nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t)); + + rt = task->thread->runtime; + + pprocess = nxt_runtime_process_find(rt, ppid); + if (nxt_slow_path(pprocess == NULL)) { + nxt_alert(task, "whoami: parent process %PI not found", ppid); + goto fail; + } + + pid = nxt_recv_msg_cmsg_pid(msg); + + nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid, + msg->fd[0]); + + if (msg->fd[0] != -1) { + port = nxt_runtime_process_port_create(task, rt, pid, 0, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + + nxt_fd_nonblocking(task, msg->fd[0]); + + port->pair[0] = -1; + port->pair[1] = msg->fd[0]; + msg->fd[0] = -1; + + port->max_size = 16 * 1024; + port->max_share = 64 * 1024; + port->socket.task = task; + + nxt_port_write_enable(task, port); + + } else { + port = nxt_runtime_port_find(rt, pid, 0); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + } + + if (ppid != nxt_pid) { + nxt_queue_insert_tail(&pprocess->children, &port->process->link); + } + + buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_pid_t), 0); + if (nxt_slow_path(buf == NULL)) { + goto fail; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t)); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1, + msg->port_msg.stream, 0, buf); + +fail: + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + } +} + + static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { @@ -751,8 +863,10 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_err_t err; nxt_pid_t pid; + nxt_port_t *port; + nxt_queue_t children; nxt_runtime_t *rt; - nxt_process_t *process; + nxt_process_t *process, *child; nxt_process_init_t init; nxt_debug(task, "sigchld handler signo:%d (%s)", @@ -809,7 +923,31 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) process->stream = 0; } + nxt_queue_init(&children); + + if (!nxt_queue_is_empty(&process->children)) { + nxt_queue_add(&children, &process->children); + + nxt_queue_init(&process->children); + + nxt_queue_each(child, &children, nxt_process_t, link) { + port = nxt_process_port_first(child); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } nxt_queue_loop; + } + if (nxt_exiting) { + nxt_process_close_ports(task, process); + + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_queue_remove(&child->link); + child->link.next = NULL; + + nxt_process_close_ports(task, child); + } nxt_queue_loop; + if (rt->nprocesses <= 1) { nxt_runtime_quit(task, 0); } @@ -819,6 +957,15 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) nxt_port_remove_notify_others(task, process); + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_port_remove_notify_others(task, child); + + nxt_queue_remove(&child->link); + child->link.next = NULL; + + nxt_process_close_ports(task, child); + } nxt_queue_loop; + init = *(nxt_process_init_t *) nxt_process_init(process); nxt_process_close_ports(task, process); -- cgit