From a1e9df2aef5a3917728c6fd37280b03020d51123 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:30 +0300 Subject: Port message extended to transfer 2 file descriptors. --- src/nxt_port_socket.c | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) (limited to 'src/nxt_port_socket.c') diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 4e3eaef6..844b65ca 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -156,6 +156,7 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.buf = b; msg.share = 0; msg.fd = fd; + msg.fd2 = -1; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.allocated = 0; @@ -331,7 +332,7 @@ next_fragment: msg->port_msg.last |= sb.last; msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; - n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); + n = nxt_socketpair_send(&port->socket, &msg->fd, iov, sb.niov + 1); if (n > 0) { if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { @@ -346,6 +347,12 @@ next_fragment: msg->fd = -1; } + if (msg->fd2 != -1 && msg->close_fd != 0) { + nxt_fd_close(msg->fd2); + + msg->fd2 = -1; + } + msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, m == NXT_PORT_METHOD_MMAP); @@ -358,6 +365,7 @@ next_fragment: * in the first message of a stream. */ msg->fd = -1; + msg->fd2 = -1; msg->share += n; msg->port_msg.nf = 1; msg->port_msg.tracking = 0; @@ -810,6 +818,10 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_fd_close(msg->fd); } + if (msg->fd2 != -1) { + nxt_fd_close(msg->fd2); + } + return; } @@ -854,6 +866,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, msg->buf = fmsg->buf; msg->fd = fmsg->fd; + msg->fd2 = fmsg->fd2; /* * To disable instant completion or buffer re-usage, @@ -888,12 +901,17 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, if (nxt_fast_path(msg->cancelled == 0)) { msg->buf = NULL; msg->fd = -1; + msg->fd2 = -1; b = NULL; } else { if (msg->fd != -1) { nxt_fd_close(msg->fd); } + + if (msg->fd2 != -1) { + nxt_fd_close(msg->fd2); + } } } else { if (nxt_fast_path(msg->cancelled == 0)) { @@ -999,6 +1017,12 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) msg->fd = -1; } + if (msg->fd2 != -1 && msg->close_fd != 0) { + nxt_fd_close(msg->fd2); + + msg->fd2 = -1; + } + for (b = msg->buf; b != NULL; b = next) { next = b->next; b->next = NULL; -- cgit From e227fc9e6281c280c46139a81646ecd7b0510e2b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:34 +0300 Subject: Introducing application and port shared memory queues. The goal is to minimize the number of syscalls needed to deliver a message. --- src/nxt_port_socket.c | 271 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 250 insertions(+), 21 deletions(-) (limited to 'src/nxt_port_socket.c') diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 844b65ca..14e2e605 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -5,6 +5,7 @@ */ #include +#include static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, @@ -17,6 +18,8 @@ static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, + void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); @@ -143,12 +146,15 @@ nxt_port_release_send_msg(nxt_port_send_msg_t *msg) nxt_int_t -nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, - void *tracking) +nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, + nxt_buf_t *b) { + int notify; + uint8_t *p; nxt_int_t res; nxt_port_send_msg_t msg; + uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; msg.link.next = NULL; msg.link.prev = NULL; @@ -156,14 +162,10 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.buf = b; msg.share = 0; msg.fd = fd; - msg.fd2 = -1; + msg.fd2 = fd2; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.allocated = 0; - if (tracking != NULL) { - nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); - } - msg.port_msg.stream = stream; msg.port_msg.pid = nxt_pid; msg.port_msg.reply_port = reply_port; @@ -172,7 +174,42 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.mmap = 0; msg.port_msg.nf = 0; msg.port_msg.mf = 0; - msg.port_msg.tracking = tracking != NULL; + + if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { + + if (fd == -1 + && (b == NULL + || nxt_buf_mem_used_size(&b->mem) + <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)))) + { + p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t)); + if (b != NULL) { + p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem)); + } + + res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, ¬ify); + + nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) (p - qmsg), notify, res); + + if (notify == 0) { + return res; + } + + msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE; + msg.buf = NULL; + + } else { + qmsg[0] = _NXT_PORT_MSG_READ_SOCKET; + + res = nxt_port_queue_send(port->queue, qmsg, 1, ¬ify); + + nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", + (int) port->pid, (int) port->id, port->socket.fd, + notify, res); + } + } res = nxt_port_msg_chk_insert(task, port, &msg); if (nxt_fast_path(res == NXT_DECLINED)) { @@ -308,10 +345,6 @@ next_fragment: port->max_size / PORT_MMAP_MIN_SIZE); } - if (msg->port_msg.tracking) { - iov[0].iov_len += sizeof(msg->tracking_msg); - } - sb.limit -= iov[0].iov_len; nxt_sendbuf_mem_coalesce(task, &sb); @@ -368,7 +401,6 @@ next_fragment: msg->fd2 = -1; msg->share += n; msg->port_msg.nf = 1; - msg->port_msg.tracking = 0; if (msg->share >= port->max_share) { msg->share = 0; @@ -576,7 +608,9 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) port->engine = task->thread->engine; port->socket.read_work_queue = &port->engine->fast_work_queue; - port->socket.read_handler = nxt_port_read_handler; + port->socket.read_handler = port->queue != NULL + ? nxt_port_queue_read_handler + : nxt_port_read_handler; port->socket.error_handler = nxt_port_error_handler; nxt_fd_event_enable_read(port->engine, &port->socket); @@ -660,6 +694,206 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) } +static void +nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) +{ + ssize_t n; + nxt_buf_t *b; + nxt_port_t *port; + struct iovec iov[2]; + nxt_port_queue_t *queue; + nxt_port_recv_msg_t msg, *smsg; + uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; + + port = nxt_container_of(obj, nxt_port_t, socket); + msg.port = port; + + nxt_assert(port->engine == task->thread->engine); + + queue = port->queue; + nxt_atomic_fetch_add(&queue->nitems, 1); + + for ( ;; ) { + + if (port->from_socket == 0) { + n = nxt_port_queue_recv(queue, qmsg); + + if (n < 0 && !port->socket.read_ready) { + nxt_atomic_fetch_add(&queue->nitems, -1); + + n = nxt_port_queue_recv(queue, qmsg); + if (n < 0) { + return; + } + + nxt_atomic_fetch_add(&queue->nitems, 1); + } + + if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) { + port->from_socket++; + + nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d", + (int) port->pid, (int) port->id, port->socket.fd, + port->from_socket); + + n = -1; + + continue; + } + + nxt_debug(task, "port{%d,%d} %d: dequeue %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + } else { + if ((smsg = port->socket_msg) != NULL && smsg->size != 0) { + msg.port_msg = smsg->port_msg; + b = smsg->buf; + n = smsg->size; + msg.fd = smsg->fd; + msg.fd2 = smsg->fd2; + + smsg->size = 0; + + port->from_socket--; + + nxt_debug(task, "port{%d,%d} %d: use suspended message %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + goto process; + } + + n = -1; + } + + if (n < 0 && !port->socket.read_ready) { + nxt_atomic_fetch_add(&queue->nitems, -1); + return; + } + + b = nxt_port_buf_alloc(port); + + if (nxt_slow_path(b == NULL)) { + /* TODO: disable event for some time */ + } + + if (n >= (ssize_t) sizeof(nxt_port_msg_t)) { + nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t)); + + if (n > (ssize_t) sizeof(nxt_port_msg_t)) { + nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t), + n - sizeof(nxt_port_msg_t)); + } + + } else { + iov[0].iov_base = &msg.port_msg; + iov[0].iov_len = sizeof(nxt_port_msg_t); + + iov[1].iov_base = b->mem.pos; + iov[1].iov_len = port->max_size; + + n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); + + if (n == (ssize_t) sizeof(nxt_port_msg_t) + && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) + { + nxt_port_buf_free(port, b); + + nxt_debug(task, "port{%d,%d} %d: recv %d read_queue", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + continue; + } + + nxt_debug(task, "port{%d,%d} %d: recvmsg %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + if (n > 0) { + if (port->from_socket == 0) { + nxt_debug(task, "port{%d,%d} %d: suspend message %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + smsg = port->socket_msg; + + if (nxt_slow_path(smsg == NULL)) { + smsg = nxt_mp_alloc(port->mem_pool, + sizeof(nxt_port_recv_msg_t)); + + if (nxt_slow_path(smsg == NULL)) { + nxt_alert(task, "port{%d,%d} %d: suspend message " + "failed", + (int) port->pid, (int) port->id, + port->socket.fd); + + return; + } + + port->socket_msg = smsg; + + } else { + if (nxt_slow_path(smsg->size != 0)) { + nxt_alert(task, "port{%d,%d} %d: too many suspend " + "messages", + (int) port->pid, (int) port->id, + port->socket.fd); + + return; + } + } + + smsg->port_msg = msg.port_msg; + smsg->buf = b; + smsg->size = n; + smsg->fd = msg.fd; + smsg->fd2 = msg.fd2; + + continue; + } + + port->from_socket--; + } + } + + process: + + if (n > 0) { + msg.buf = b; + msg.size = n; + + nxt_port_read_msg_process(task, port, &msg); + + /* + * To disable instant completion or buffer re-usage, + * handler should reset 'msg.buf'. + */ + if (msg.buf == b) { + nxt_port_buf_free(port, b); + } + + continue; + } + + if (n == NXT_AGAIN) { + nxt_port_buf_free(port, b); + + nxt_fd_event_enable_read(task->thread->engine, &port->socket); + + continue; + } + + /* n == 0 || n == NXT_ERROR */ + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_port_error_handler, task, &port->socket, NULL); + return; + } +} + + typedef struct { uint32_t stream; uint32_t pid; @@ -831,12 +1065,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = orig_b = msg->buf; b->mem.free += msg->size; - if (msg->port_msg.tracking) { - msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; - - } else { - msg->cancelled = 0; - } + msg->cancelled = 0; if (nxt_slow_path(msg->port_msg.nf != 0)) { -- cgit From f147943f6382c0e90a216615ff9bcf57a3db8c75 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 21:48:27 +0300 Subject: Style fixes for 2 file descriptors transfer over port. Two consecutive fd and fd2 fields replaced with array. --- src/nxt_port_socket.c | 78 +++++++++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 37 deletions(-) (limited to 'src/nxt_port_socket.c') diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 14e2e605..5ca2eb38 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -161,8 +161,8 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.buf = b; msg.share = 0; - msg.fd = fd; - msg.fd2 = fd2; + msg.fd[0] = fd; + msg.fd[1] = fd2; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.allocated = 0; @@ -365,7 +365,7 @@ next_fragment: msg->port_msg.last |= sb.last; msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; - n = nxt_socketpair_send(&port->socket, &msg->fd, iov, sb.niov + 1); + n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); if (n > 0) { if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { @@ -374,16 +374,18 @@ next_fragment: goto fail; } - if (msg->fd != -1 && msg->close_fd != 0) { - nxt_fd_close(msg->fd); + if (msg->close_fd) { + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); - msg->fd = -1; - } + msg->fd[0] = -1; + } - if (msg->fd2 != -1 && msg->close_fd != 0) { - nxt_fd_close(msg->fd2); + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); - msg->fd2 = -1; + msg->fd[1] = -1; + } } msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, @@ -397,8 +399,8 @@ next_fragment: * A file descriptor is sent only * in the first message of a stream. */ - msg->fd = -1; - msg->fd2 = -1; + msg->fd[0] = -1; + msg->fd[1] = -1; msg->share += n; msg->port_msg.nf = 1; @@ -654,7 +656,7 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); + n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); if (n > 0) { @@ -750,8 +752,8 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) msg.port_msg = smsg->port_msg; b = smsg->buf; n = smsg->size; - msg.fd = smsg->fd; - msg.fd2 = smsg->fd2; + msg.fd[0] = smsg->fd[0]; + msg.fd[1] = smsg->fd[1]; smsg->size = 0; @@ -793,7 +795,7 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); + n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); if (n == (ssize_t) sizeof(nxt_port_msg_t) && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) @@ -848,8 +850,8 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) smsg->port_msg = msg.port_msg; smsg->buf = b; smsg->size = n; - smsg->fd = msg.fd; - smsg->fd2 = msg.fd2; + smsg->fd[0] = msg.fd[0]; + smsg->fd[1] = msg.fd[1]; continue; } @@ -1048,12 +1050,12 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_alert(task, "port %d: too small message:%uz", port->socket.fd, msg->size); - if (msg->fd != -1) { - nxt_fd_close(msg->fd); + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); } - if (msg->fd2 != -1) { - nxt_fd_close(msg->fd2); + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); } return; @@ -1094,8 +1096,8 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, port->handler(task, fmsg); msg->buf = fmsg->buf; - msg->fd = fmsg->fd; - msg->fd2 = fmsg->fd2; + msg->fd[0] = fmsg->fd[0]; + msg->fd[1] = fmsg->fd[1]; /* * To disable instant completion or buffer re-usage, @@ -1129,17 +1131,17 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, if (nxt_fast_path(msg->cancelled == 0)) { msg->buf = NULL; - msg->fd = -1; - msg->fd2 = -1; + msg->fd[0] = -1; + msg->fd[1] = -1; b = NULL; } else { - if (msg->fd != -1) { - nxt_fd_close(msg->fd); + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); } - if (msg->fd2 != -1) { - nxt_fd_close(msg->fd2); + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); } } } else { @@ -1240,16 +1242,18 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { - if (msg->fd != -1 && msg->close_fd != 0) { - nxt_fd_close(msg->fd); + if (msg->close_fd) { + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); - msg->fd = -1; - } + msg->fd[0] = -1; + } - if (msg->fd2 != -1 && msg->close_fd != 0) { - nxt_fd_close(msg->fd2); + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); - msg->fd2 = -1; + msg->fd[1] = -1; + } } for (b = msg->buf; b != NULL; b = next) { -- cgit From 2136eb411c9b99ffd65751bd13e10ce426be2492 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 12 Aug 2020 13:37:49 +0300 Subject: Fixing issues found by static analyzer. --- src/nxt_port_socket.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/nxt_port_socket.c') diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 5ca2eb38..9d8096b2 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -208,6 +208,10 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", (int) port->pid, (int) port->id, port->socket.fd, notify, res); + + if (nxt_slow_path(res == NXT_AGAIN)) { + return NXT_AGAIN; + } } } @@ -738,8 +742,6 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) (int) port->pid, (int) port->id, port->socket.fd, port->from_socket); - n = -1; - continue; } -- cgit