From 83595606121a821f9e3cef0f0b7e7fe87eb1e50a Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:15 +0300 Subject: Introducing the shared application port. This is the port shared between all application processes which use it to pass requests for processing. Using it significantly simplifies the request processing code in the router. The drawback is 2 more file descriptors per each configured application and more complex libunit message wait/read code. --- src/nxt_port.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/nxt_port.c') diff --git a/src/nxt_port.c b/src/nxt_port.c index 7232c465..54434d70 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -67,8 +67,6 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); - nxt_queue_init(&port->pending_requests); - nxt_queue_init(&port->active_websockets); } else { nxt_mp_destroy(mp); -- cgit From e227fc9e6281c280c46139a81646ecd7b0510e2b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:34 +0300 Subject: Introducing application and port shared memory queues. The goal is to minimize the number of syscalls needed to deliver a message. --- src/nxt_port.c | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) (limited to 'src/nxt_port.c') diff --git a/src/nxt_port.c b/src/nxt_port.c index 54434d70..c9189d7c 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -8,6 +8,7 @@ #include #include #include +#include static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -68,6 +69,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); + port->queue_fd = -1; + } else { nxt_mp_destroy(mp); } @@ -99,6 +102,16 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port) nxt_router_app_port_close(task, port); } } + + if (port->queue_fd != -1) { + nxt_fd_close(port->queue_fd); + port->queue_fd = -1; + } + + if (port->queue != NULL) { + nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t)); + port->queue = NULL; + } } @@ -176,6 +189,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +/* TODO join with process_ready and move to nxt_main_process.c */ nxt_inline void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port, uint32_t stream) @@ -227,8 +241,9 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, msg->max_share = port->max_share; msg->type = new_port->type; - return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, - new_port->pair[1], stream, 0, b); + return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT, + new_port->pair[1], new_port->queue_fd, + stream, 0, b); } @@ -279,7 +294,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->u.new_port = port; } - +/* TODO move to nxt_main_process.c */ void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -304,6 +319,13 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "process %PI ready", msg->port_msg.pid); + if (msg->fd != -1) { + port->queue_fd = msg->fd; + port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd, + 0); + } + nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); } -- cgit From f147943f6382c0e90a216615ff9bcf57a3db8c75 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 21:48:27 +0300 Subject: Style fixes for 2 file descriptors transfer over port. Two consecutive fd and fd2 fields replaced with array. --- src/nxt_port.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) (limited to 'src/nxt_port.c') diff --git a/src/nxt_port.c b/src/nxt_port.c index c9189d7c..dbcdec11 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -261,15 +261,15 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) /* TODO check b size and make plain */ nxt_debug(task, "new port %d received for process %PI:%d", - msg->fd, new_port_msg->pid, new_port_msg->id); + msg->fd[0], new_port_msg->pid, new_port_msg->id); port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); if (port != NULL) { nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, new_port_msg->id); - nxt_fd_close(msg->fd); - msg->fd = -1; + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; return; } @@ -280,10 +280,10 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - nxt_fd_nonblocking(task, msg->fd); + nxt_fd_nonblocking(task, msg->fd[0]); port->pair[0] = -1; - port->pair[1] = msg->fd; + port->pair[1] = msg->fd[0]; port->max_size = new_port_msg->max_size; port->max_share = new_port_msg->max_share; @@ -319,11 +319,11 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "process %PI ready", msg->port_msg.pid); - if (msg->fd != -1) { - port->queue_fd = msg->fd; + if (msg->fd[0] != -1) { + port->queue_fd = msg->fd[0]; port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), - PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd, - 0); + PROT_READ | PROT_WRITE, MAP_SHARED, + msg->fd[0], 0); } nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); @@ -338,7 +338,7 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - if (nxt_slow_path(msg->fd == -1)) { + if (nxt_slow_path(msg->fd[0] == -1)) { nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); return; @@ -352,11 +352,11 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto fail_close; } - nxt_port_incoming_port_mmap(task, process, msg->fd); + nxt_port_incoming_port_mmap(task, process, msg->fd[0]); fail_close: - nxt_fd_close(msg->fd); + nxt_fd_close(msg->fd[0]); } @@ -409,14 +409,14 @@ nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) log_file = nxt_list_elt(rt->log_files, slot); - nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); + nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd); /* * The old log file descriptor must be closed at the moment when no * other threads use it. dup2() allows to use the old file descriptor * for new log file. This change is performed atomically in the kernel. */ - if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { + if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) { if (slot == 0) { (void) nxt_file_stderr(log_file); } -- cgit