From c617480eefc0822d52f9153906bb526ad483b9a3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Sat, 25 Jul 2020 11:06:32 +0300 Subject: Using plain shared memory for configuration pass. There is no restrictions on configration size and using segmented shared memory only doubles memory usage because to parse configration on router side, it needs to be 'plain' e. g. located in single continous memory buffer. --- src/nxt_port_memory.c | 135 +++++++++++++++++++++++++++++--------------------- 1 file changed, 78 insertions(+), 57 deletions(-) (limited to 'src/nxt_port_memory.c') diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index f4d2125c..fd472cc6 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -286,7 +286,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) { void *mem; - u_char *p, name[64]; nxt_fd_t fd; nxt_int_t i; nxt_free_map_t *free_map; @@ -310,63 +309,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", - nxt_pid, nxt_random(&task->thread->random)); - *p = '\0'; - -#if (NXT_HAVE_MEMFD_CREATE) - - fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); - - if (nxt_slow_path(fd == -1)) { - nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); - - goto remove_fail; - } - - nxt_debug(task, "memfd_create(%s): %FD", name, fd); - -#elif (NXT_HAVE_SHM_OPEN_ANON) - - fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); - - nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); - - if (nxt_slow_path(fd == -1)) { - nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); - - goto remove_fail; - } - -#elif (NXT_HAVE_SHM_OPEN) - - /* Just in case. */ - shm_unlink((char *) name); - - fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); - - nxt_debug(task, "shm_open(%s): %FD", name, fd); - + fd = nxt_shm_open(task, PORT_MMAP_SIZE); if (nxt_slow_path(fd == -1)) { - nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); - - goto remove_fail; - } - - if (nxt_slow_path(shm_unlink((char *) name) == -1)) { - nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, - nxt_errno); - } - -#else - -#error No working shared memory implementation. - -#endif - - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { - nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); - goto remove_fail; } @@ -423,6 +367,83 @@ remove_fail: } +nxt_int_t +nxt_shm_open(nxt_task_t *task, size_t size) +{ + nxt_fd_t fd; + +#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) + + u_char *p, name[64]; + + p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", + nxt_pid, nxt_random(&task->thread->random)); + *p = '\0'; + +#endif + +#if (NXT_HAVE_MEMFD_CREATE) + + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + + if (nxt_slow_path(fd == -1)) { + nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); + + return -1; + } + + nxt_debug(task, "memfd_create(%s): %FD", name, fd); + +#elif (NXT_HAVE_SHM_OPEN_ANON) + + fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); + + if (nxt_slow_path(fd == -1)) { + nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); + + return -1; + } + + nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); + +#elif (NXT_HAVE_SHM_OPEN) + + /* Just in case. */ + shm_unlink((char *) name); + + fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); + + if (nxt_slow_path(fd == -1)) { + nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); + + return -1; + } + + nxt_debug(task, "shm_open(%s): %FD", name, fd); + + if (nxt_slow_path(shm_unlink((char *) name) == -1)) { + nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, + nxt_errno); + } + +#else + +#error No working shared memory implementation. + +#endif + + if (nxt_slow_path(ftruncate(fd, size) == -1)) { + nxt_alert(task, "ftruncate() failed %E", nxt_errno); + + nxt_fd_close(fd); + + return -1; + } + + return fd; +} + + static nxt_port_mmap_handler_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, nxt_int_t n, nxt_bool_t tracking) -- cgit From 6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:13 +0300 Subject: Changing router to application shared memory exchange protocol. The application process needs to request the shared memory segment from the router instead of the latter pushing the segment before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router for using the application shared port and then the queue. --- src/nxt_port_memory.c | 73 ++++++++++++++++----------------------------------- 1 file changed, 23 insertions(+), 50 deletions(-) (limited to 'src/nxt_port_memory.c') diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index fd472cc6..1e01629e 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -282,8 +282,8 @@ fail: static nxt_port_mmap_handler_t * -nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, - nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) +nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps, + nxt_bool_t tracking, nxt_int_t n) { void *mem; nxt_fd_t fd; @@ -295,15 +295,14 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); if (nxt_slow_path(mmap_handler == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); + nxt_alert(task, "failed to allocate mmap_handler"); return NULL; } - port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); + port_mmap = nxt_port_mmap_at(mmaps, mmaps->size); if (nxt_slow_path(port_mmap == NULL)) { - nxt_log(task, NXT_LOG_WARN, - "failed to add port mmap to outgoing array"); + nxt_alert(task, "failed to add port mmap to mmaps array"); nxt_free(mmap_handler); return NULL; @@ -322,6 +321,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, } mmap_handler->hdr = mem; + mmap_handler->fd = fd; port_mmap->mmap_handler = mmap_handler; nxt_port_mmap_handler_use(mmap_handler, 1); @@ -331,10 +331,9 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); - hdr->id = process->outgoing.size - 1; + hdr->id = mmaps->size - 1; hdr->src_pid = nxt_pid; - hdr->dst_pid = process->pid; - hdr->sent_over = port->id; + hdr->sent_over = 0xFFFFu; /* Mark first chunk as busy */ free_map = tracking ? hdr->free_tracking_map : hdr->free_map; @@ -347,13 +346,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); - nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); - - /* TODO handle error */ - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); - - nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", - hdr->id, nxt_pid, process->pid); + nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...", + hdr->id, nxt_pid); return mmap_handler; @@ -361,7 +355,7 @@ remove_fail: nxt_free(mmap_handler); - process->outgoing.size--; + mmaps->size--; return NULL; } @@ -445,34 +439,28 @@ nxt_shm_open(nxt_task_t *task, size_t size) static nxt_port_mmap_handler_t * -nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, +nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c, nxt_int_t n, nxt_bool_t tracking) { nxt_int_t i, res, nchunks; - nxt_process_t *process; nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; - process = port->process; - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - nxt_thread_mutex_lock(&process->outgoing.mutex); + nxt_thread_mutex_lock(&mmaps->mutex); - end_port_mmap = process->outgoing.elts + process->outgoing.size; + end_port_mmap = mmaps->elts + mmaps->size; - for (port_mmap = process->outgoing.elts; + for (port_mmap = mmaps->elts; port_mmap < end_port_mmap; port_mmap++) { mmap_handler = port_mmap->mmap_handler; hdr = mmap_handler->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { + if (hdr->sent_over != 0xFFFFu) { continue; } @@ -510,11 +498,11 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, /* TODO introduce port_mmap limit and release wait. */ *c = 0; - mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n); + mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n); unlock_return: - nxt_thread_mutex_unlock(&process->outgoing.mutex); + nxt_thread_mutex_unlock(&mmaps->mutex); return mmap_handler; } @@ -549,7 +537,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) nxt_int_t -nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_port_mmap_tracking_t *tracking, uint32_t stream) { nxt_chunk_id_t c; @@ -558,7 +546,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "request tracking for stream #%uD", stream); - mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); + mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1); if (nxt_slow_path(mmap_handler == NULL)) { return NXT_ERROR; } @@ -680,7 +668,7 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_buf_t * -nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) +nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size) { nxt_mp_t *mp; nxt_buf_t *b; @@ -707,7 +695,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); + mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0); if (nxt_slow_path(mmap_handler == NULL)) { mp = task->thread->engine->mem_pool; nxt_mp_free(mp, b); @@ -943,9 +931,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) { - nxt_port_method_t m; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_handler_t *mmap_handler; + nxt_port_method_t m; m = NXT_PORT_METHOD_ANY; @@ -956,9 +942,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) } if (nxt_buf_is_port_mmap(b)) { - mmap_handler = b->parent; - hdr = mmap_handler->hdr; - if (m == NXT_PORT_METHOD_PLAIN) { nxt_log_error(NXT_LOG_ERR, task->log, "mixing plain and mmap buffers, " @@ -967,16 +950,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) break; } - if (port->pid != hdr->dst_pid) { - nxt_log_error(NXT_LOG_ERR, task->log, - "send mmap buffer for %PI to %PI, " - "using plain mode", hdr->dst_pid, port->pid); - - m = NXT_PORT_METHOD_PLAIN; - - break; - } - if (m == NXT_PORT_METHOD_ANY) { nxt_debug(task, "using mmap mode"); -- cgit