From f69d4707527da8c48e93cb49f85c71c890ae8edd Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Tue, 21 Jul 2020 20:27:37 +0300 Subject: Fixed non-debug log time format in libunit. This makes log format used in libunit consistent with the daemon, where milliseconds are printed only in the debug log level. Currently a compile time switch is used, since there's no support for runtime changing of a log level for now. But in the future this should be a runtime condition, similar to nxt_log_time_handler(). --- src/nxt_unit.c | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 9f6eab95..89998e3f 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4977,11 +4977,18 @@ nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) tm = *localtime(&ts.tv_sec); #endif +#if (NXT_DEBUG) p += snprintf(p, end - p, "%4d/%02d/%02d %02d:%02d:%02d.%03d ", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, (int) ts.tv_nsec / 1000000); +#else + p += snprintf(p, end - p, + "%4d/%02d/%02d %02d:%02d:%02d ", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec); +#endif p += snprintf(p, end - p, "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], -- cgit From ec3389b63bd7a9159d2be4a2863140f75095c7d3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:19:55 +0300 Subject: Libunit refactoring: port management. - Changed the port management callbacks to notifications, which e. g. avoids the need to call the libunit function - Added context and library instance reference counts for a safer resource release - Added the router main port initialization --- src/nxt_unit.c | 580 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 319 insertions(+), 261 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 89998e3f..8c964c7a 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -38,16 +38,19 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); +nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); +nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, - uint32_t *shm_limit); -static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream); + nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, + int *log_fd, uint32_t *stream, uint32_t *shm_limit); +static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, @@ -96,8 +99,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); -static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, int i); +nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); +nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, uint32_t id); @@ -110,28 +113,35 @@ static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd); static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, nxt_unit_port_id_t *new_port, int fd); -static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, +static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); +static int nxt_unit_remove_port(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id); +static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port, nxt_unit_process_t **process); -static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, +static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); +static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); - -static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, +static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, + const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size); @@ -233,6 +243,8 @@ struct nxt_unit_read_buf_s { struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; + nxt_atomic_t use_count; + pthread_mutex_t mutex; nxt_unit_port_id_t read_port_id; @@ -269,6 +281,8 @@ struct nxt_unit_impl_s { nxt_unit_t unit; nxt_unit_callbacks_t callbacks; + nxt_atomic_t use_count; + uint32_t request_data_size; uint32_t shm_mmap_limit; @@ -277,7 +291,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_port_id_t ready_port_id; + nxt_unit_port_id_t router_port_id; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -341,7 +355,7 @@ nxt_unit_init(nxt_unit_init_t *init) uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; - nxt_unit_port_t ready_port, read_port; + nxt_unit_port_t ready_port, router_port, read_port; lib = nxt_unit_create(init); if (nxt_slow_path(lib == NULL)) { @@ -354,17 +368,20 @@ nxt_unit_init(nxt_unit_init_t *init) { ready_port = init->ready_port; ready_stream = init->ready_stream; + router_port = init->router_port; read_port = init->read_port; lib->log_fd = init->log_fd; nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, ready_port.id.id); + nxt_unit_port_id_init(&router_port.id, router_port.id.pid, + router_port.id.id); nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); } else { - rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, - &ready_stream, &shm_limit); + rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, + &lib->log_fd, &ready_stream, &shm_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } @@ -380,14 +397,16 @@ nxt_unit_init(nxt_unit_init_t *init) lib->pid = read_port.id.pid; ctx = &lib->main_ctx.ctx; - rc = lib->callbacks.add_port(ctx, &ready_port); - if (rc != NXT_UNIT_OK) { - nxt_unit_alert(NULL, "failed to add ready_port"); + rc = nxt_unit_add_port(ctx, &router_port); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - rc = lib->callbacks.add_port(ctx, &read_port); + lib->router_port_id = router_port.id; + + rc = nxt_unit_add_port(ctx, &read_port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to add read_port"); @@ -395,15 +414,16 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->main_ctx.read_port_id = read_port.id; - lib->ready_port_id = ready_port.id; - rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); goto fail; } + close(ready_port.out_fd); + return ctx; fail: @@ -450,6 +470,8 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); + lib->use_count = 0; + rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; @@ -463,26 +485,6 @@ nxt_unit_create(nxt_unit_init_t *init) goto fail; } - if (cb->add_port == NULL) { - cb->add_port = nxt_unit_add_port; - } - - if (cb->remove_port == NULL) { - cb->remove_port = nxt_unit_remove_port; - } - - if (cb->remove_pid == NULL) { - cb->remove_pid = nxt_unit_remove_pid; - } - - if (cb->quit == NULL) { - cb->quit = nxt_unit_quit; - } - - if (cb->port_send == NULL) { - cb->port_send = nxt_unit_port_send_default; - } - if (cb->port_recv == NULL) { cb->port_recv = nxt_unit_port_recv_default; } @@ -506,8 +508,6 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->ctx.data = data; ctx_impl->ctx.unit = &lib->unit; - nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); - rc = pthread_mutex_init(&ctx_impl->mutex, NULL); if (nxt_slow_path(rc != 0)) { nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); @@ -515,6 +515,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, return NXT_UNIT_ERROR; } + nxt_unit_lib_use(lib); + + nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + + ctx_impl->use_count = 1; + nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); @@ -540,6 +546,62 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, } +nxt_inline void +nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_atomic_fetch_add(&ctx_impl->use_count, 1); +} + + +nxt_inline void +nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) +{ + long c; + + c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); + + if (c == 1) { + nxt_unit_ctx_free(ctx_impl); + } +} + + +nxt_inline void +nxt_unit_lib_use(nxt_unit_impl_t *lib) +{ + nxt_atomic_fetch_add(&lib->use_count, 1); +} + + +nxt_inline void +nxt_unit_lib_release(nxt_unit_impl_t *lib) +{ + long c; + nxt_unit_process_t *process; + + c = nxt_atomic_fetch_add(&lib->use_count, -1); + + if (c == 1) { + for ( ;; ) { + pthread_mutex_lock(&lib->mutex); + + process = nxt_unit_process_pop_first(lib); + if (process == NULL) { + pthread_mutex_unlock(&lib->mutex); + + break; + } + + nxt_unit_remove_process(lib, process); + } + + pthread_mutex_destroy(&lib->mutex); + + free(lib); + } +} + + nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf) @@ -585,15 +647,16 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int -nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream, uint32_t *shm_limit) +nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, + nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + uint32_t *shm_limit) { int rc; - int ready_fd, read_fd; + int ready_fd, router_fd, read_fd; char *unit_init, *version_end; long version_length; - int64_t ready_pid, read_pid; - uint32_t ready_stream, ready_id, read_id; + int64_t ready_pid, router_pid, read_pid; + uint32_t ready_stream, router_id, ready_id, read_id; unit_init = getenv(NXT_UNIT_INIT_ENV); if (nxt_slow_path(unit_init == NULL)) { @@ -621,13 +684,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" + "%"PRId64",%"PRIu32",%d;" "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, + &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_fd, log_fd, shm_limit); - if (nxt_slow_path(rc != 9)) { + if (nxt_slow_path(rc != 12)) { nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; @@ -639,6 +704,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, ready_port->out_fd = ready_fd; ready_port->data = NULL; + nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); + + router_port->in_fd = -1; + router_port->out_fd = router_fd; + router_port->data = NULL; + nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); read_port->in_fd = read_fd; @@ -652,8 +723,7 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, static int -nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream) +nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) { ssize_t res; nxt_port_msg_t msg; @@ -671,7 +741,7 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -684,13 +754,12 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int rc; - pid_t pid; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_unit_impl_t *lib; - nxt_unit_recv_msg_t recv_msg; - nxt_unit_callbacks_t *cb; + int rc; + pid_t pid; + struct cmsghdr *cm; + nxt_port_msg_t *port_msg; + nxt_unit_impl_t *lib; + nxt_unit_recv_msg_t recv_msg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -749,14 +818,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } } - cb = &lib->callbacks; - switch (port_msg->type) { case _NXT_PORT_MSG_QUIT: nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); - cb->quit(ctx); + nxt_unit_quit(ctx); rc = NXT_UNIT_OK; break; @@ -812,7 +879,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", port_msg->stream, (int) pid); - cb->remove_pid(ctx, pid); + nxt_unit_remove_pid(lib, pid); rc = NXT_UNIT_OK; break; @@ -839,7 +906,7 @@ fail: } if (recv_msg.process != NULL) { - nxt_unit_process_use(ctx, recv_msg.process, -1); + nxt_unit_process_release(recv_msg.process); } return rc; @@ -850,7 +917,6 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; - nxt_unit_impl_t *lib; nxt_unit_port_t new_port; nxt_port_msg_new_port_t *new_port_msg; @@ -894,9 +960,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->fd = -1; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - return lib->callbacks.add_port(ctx, &new_port); + return nxt_unit_add_port(ctx, &new_port); } @@ -1206,7 +1270,7 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) * existence. */ if (req_impl->process != NULL) { - nxt_unit_process_use(req->ctx, req_impl->process, -1); + nxt_unit_process_release(req_impl->process); req_impl->process = NULL; } @@ -1808,7 +1872,7 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) pthread_mutex_lock(&lib->mutex); - recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); + recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0); pthread_mutex_unlock(&lib->mutex); @@ -1869,15 +1933,6 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) } -typedef struct { - size_t len; - const char *str; -} nxt_unit_str_t; - - -#define nxt_unit_str(str) { nxt_length(str), str } - - int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) { @@ -2064,8 +2119,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, (int) m.mmap_msg.chunk_id, (int) m.mmap_msg.size); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), - NULL, 0); + res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + NULL, 0); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; } @@ -2114,10 +2169,10 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, stream, (int) (sizeof(m.msg) + m.mmap_msg.size)); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, - buf->start - sizeof(m.msg), - m.mmap_msg.size + sizeof(m.msg), - NULL, 0); + res = nxt_unit_port_send(ctx, &mmap_buf->port_id, + buf->start - sizeof(m.msg), + m.mmap_msg.size + sizeof(m.msg), + NULL, 0); if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { goto free_buf; } @@ -2689,8 +2744,8 @@ skip_response_send: msg.mf = 0; msg.tracking = 0; - (void) lib->callbacks.port_send(req->ctx, &req->response_port, - &msg, sizeof(msg), NULL, 0); + (void) nxt_unit_port_send(req->ctx, &req->response_port, + &msg, sizeof(msg), NULL, 0); nxt_unit_request_info_release(req); } @@ -3006,7 +3061,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3284,8 +3339,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3382,7 +3437,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_find(ctx, pid, 0); + process = nxt_unit_process_find(lib, pid, 0); pthread_mutex_unlock(&lib->mutex); @@ -3444,7 +3499,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) fail: - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); return rc; } @@ -3462,15 +3517,22 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) } -static void -nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) +nxt_inline void +nxt_unit_process_use(nxt_unit_process_t *process) +{ + nxt_atomic_fetch_add(&process->use_count, 1); +} + + +nxt_inline void +nxt_unit_process_release(nxt_unit_process_t *process) { long c; - c = nxt_atomic_fetch_add(&process->use_count, i); + c = nxt_atomic_fetch_add(&process->use_count, -1); - if (i < 0 && c == -i) { - nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); + if (c == 1) { + nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); nxt_unit_mmaps_destroy(&process->incoming); nxt_unit_mmaps_destroy(&process->outgoing); @@ -3727,7 +3789,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3772,26 +3834,23 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) static nxt_unit_process_t * -nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) +nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { process = lhq.value; - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); return process; } process = malloc(sizeof(nxt_unit_process_t)); if (nxt_slow_path(process == NULL)) { - nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); + nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid); return NULL; } @@ -3815,7 +3874,7 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) break; default: - nxt_unit_warn(ctx, "process %d insert failed", (int) pid); + nxt_unit_alert(NULL, "process %d insert failed", (int) pid); pthread_mutex_destroy(&process->outgoing.mutex); pthread_mutex_destroy(&process->incoming.mutex); @@ -3824,22 +3883,19 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) break; } - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); return process; } static nxt_unit_process_t * -nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) +nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) { int rc; - nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (remove) { @@ -3853,7 +3909,7 @@ nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) process = lhq.value; if (!remove) { - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); } return process; @@ -3873,8 +3929,13 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_unit_ctx_use(ctx_impl); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_OK; @@ -3887,6 +3948,8 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) } } + nxt_unit_ctx_release(ctx_impl); + return rc; } @@ -3900,6 +3963,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_unit_ctx_use(ctx_impl); + pthread_mutex_lock(&ctx_impl->mutex); if (ctx_impl->pending_read_head != NULL) { @@ -3915,6 +3980,9 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } else { rbuf = nxt_unit_read_buf_get_impl(ctx_impl); if (nxt_slow_path(rbuf == NULL)) { + + nxt_unit_ctx_release(ctx_impl); + return NXT_UNIT_ERROR; } @@ -3936,6 +4004,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_ctx_release(ctx_impl); + return rc; } @@ -3968,34 +4038,11 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) void nxt_unit_done(nxt_unit_ctx_t *ctx) { - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_unit_ctx_impl_t *ctx_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { - - nxt_unit_ctx_free(&ctx_impl->ctx); - - } nxt_queue_loop; - - for ( ;; ) { - pthread_mutex_lock(&lib->mutex); - - process = nxt_unit_process_pop_first(lib); - if (process == NULL) { - pthread_mutex_unlock(&lib->mutex); - - break; - } - - nxt_unit_remove_process(ctx, process); - } - - pthread_mutex_destroy(&lib->mutex); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - free(lib); + nxt_unit_ctx_release(ctx_impl); } @@ -4023,9 +4070,9 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) return NULL; } - rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); + rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + nxt_unit_remove_port(lib, &new_port_id); close(fd); @@ -4038,7 +4085,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) rc = nxt_unit_ctx_init(lib, new_ctx, data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + nxt_unit_remove_port(lib, &new_port_id); free(new_ctx); @@ -4051,17 +4098,15 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) } -void -nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) +static void +nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) { nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); nxt_queue_each(req_impl, &ctx_impl->active_req, nxt_unit_request_info_impl_t, link) @@ -4102,6 +4147,8 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) if (ctx_impl != &lib->main_ctx) { free(ctx_impl); } + + nxt_unit_lib_release(lib); } @@ -4127,36 +4174,6 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) } -int -nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *port_id) -{ - int rc, fd; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t new_port_id; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - rc = nxt_unit_create_port(ctx, &new_port_id, &fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; - } - - rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); - - if (nxt_fast_path(rc == NXT_UNIT_OK)) { - *port_id = new_port_id; - - } else { - lib->callbacks.remove_port(ctx, &new_port_id); - } - - close(fd); - - return rc; -} - - static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) { @@ -4180,7 +4197,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_get(ctx, lib->pid); + process = nxt_unit_process_get(lib, lib->pid); if (nxt_slow_path(process == NULL)) { pthread_mutex_unlock(&lib->mutex); @@ -4198,9 +4215,9 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) pthread_mutex_unlock(&lib->mutex); - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); - rc = lib->callbacks.add_port(ctx, &new_port); + rc = nxt_unit_add_port(ctx, &new_port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_warn(ctx, "create_port: add_port() failed"); @@ -4269,14 +4286,13 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); - return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; + return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; } -int +static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; @@ -4295,18 +4311,41 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); + if (old_port->port.data == NULL) { + old_port->port.data = port->data; + port->data = NULL; + } + + if (old_port->port.in_fd == -1) { + old_port->port.in_fd = port->in_fd; + port->in_fd = -1; + } + if (port->in_fd != -1) { close(port->in_fd); port->in_fd = -1; } + if (old_port->port.out_fd == -1) { + old_port->port.out_fd = port->out_fd; + port->out_fd = -1; + } + if (port->out_fd != -1) { close(port->out_fd); port->out_fd = -1; } + *port = old_port->port; + pthread_mutex_unlock(&lib->mutex); + if (lib->callbacks.add_port != NULL + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, &old_port->port); + } + return NXT_UNIT_OK; } @@ -4314,7 +4353,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); - process = nxt_unit_process_get(ctx, port->id.pid); + process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { rc = NXT_UNIT_ERROR; goto unlock; @@ -4351,72 +4390,80 @@ unlock: pthread_mutex_unlock(&lib->mutex); if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); } - return rc; -} - + if (lib->callbacks.add_port != NULL + && rc == NXT_UNIT_OK + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, &new_port->port); + } -void -nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) -{ - nxt_unit_find_remove_port(ctx, port_id, NULL); + return rc; } -void -nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port) +static int +nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - nxt_unit_impl_t *lib; + int res; + nxt_unit_port_t *port; nxt_unit_process_t *process; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port = NULL; + process = NULL; pthread_mutex_lock(&lib->mutex); - process = NULL; - - nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); + res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process); pthread_mutex_unlock(&lib->mutex); + if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) { + lib->callbacks.remove_port(&lib->unit, port); + } + + if (nxt_fast_path(port != NULL)) { + if (port->in_fd != -1) { + close(port->in_fd); + } + + if (port->out_fd != -1) { + close(port->out_fd); + } + } + if (nxt_slow_path(process != NULL)) { - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); + } + + if (nxt_fast_path(port != NULL)) { + free(port); } + + return res; } -static void -nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port, nxt_unit_process_t **process) +static int +nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id, + nxt_unit_port_t **r_port, nxt_unit_process_t **process) { - nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { - nxt_unit_debug(ctx, "remove_port: port %d,%d not found", + nxt_unit_debug(NULL, "remove_port: port %d,%d not found", (int) port_id->pid, (int) port_id->id); - return; + return NXT_UNIT_ERROR; } - nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", + nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, port->port.in_fd, port->port.out_fd, port->port.data); - if (port->port.in_fd != -1) { - close(port->port.in_fd); - } - - if (port->port.out_fd != -1) { - close(port->port.out_fd); - } - if (port->process != NULL) { nxt_queue_remove(&port->link); } @@ -4426,60 +4473,55 @@ nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } if (r_port != NULL) { - *r_port = port->port; + *r_port = &port->port; } - free(port); + return NXT_UNIT_OK; } -void -nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) +static void +nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_find(ctx, pid, 1); + process = nxt_unit_process_find(lib, pid, 1); if (nxt_slow_path(process == NULL)) { - nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); + nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); pthread_mutex_unlock(&lib->mutex); return; } - nxt_unit_remove_process(ctx, process); + nxt_unit_remove_process(lib, process); + + if (lib->callbacks.remove_pid != NULL) { + lib->callbacks.remove_pid(&lib->unit, pid); + } } static void -nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) +nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) { nxt_queue_t ports; - nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_queue_init(&ports); nxt_queue_add(&ports, &process->ports); nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { - nxt_unit_process_use(ctx, process, -1); - port->process = NULL; + nxt_unit_process_release(process); - /* Shortcut for default callback. */ - if (lib->callbacks.remove_port == nxt_unit_remove_port) { - nxt_queue_remove(&port->link); + /* To avoid unlink port. */ + port->process = NULL; - nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); - } + nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL); } nxt_queue_loop; @@ -4489,15 +4531,27 @@ nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) nxt_queue_remove(&port->link); - lib->callbacks.remove_port(ctx, &port->port.id); + if (lib->callbacks.remove_port != NULL) { + lib->callbacks.remove_port(&lib->unit, &port->port); + } + + if (port->port.in_fd != -1) { + close(port->port.in_fd); + } + + if (port->port.out_fd != -1) { + close(port->port.out_fd); + } + + free(port); } nxt_queue_loop; - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); } -void +static void nxt_unit_quit(nxt_unit_ctx_t *ctx) { nxt_unit_impl_t *lib; @@ -4505,11 +4559,15 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib->online = 0; + + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); + } } static ssize_t -nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { int fd; @@ -4522,35 +4580,35 @@ nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); - if (nxt_fast_path(port != NULL)) { + if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) { fd = port->port.out_fd; - } else { - nxt_unit_warn(ctx, "port_send: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - fd = -1; - } + pthread_mutex_unlock(&lib->mutex); - pthread_mutex_unlock(&lib->mutex); + } else { + pthread_mutex_unlock(&lib->mutex); - if (nxt_slow_path(fd == -1)) { - if (port != NULL) { - nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1", - (int) port_id->pid, (int) port_id->id); - } + nxt_unit_alert(ctx, "port_send: port %d,%d not found", + (int) port_id->pid, (int) port_id->id); - return -1; + return -NXT_UNIT_ERROR; } nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", (int) port_id->pid, (int) port_id->id, fd); - return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size); + if (lib->callbacks.port_send == NULL) { + return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size); + + } else { + return lib->callbacks.port_send(ctx, port_id, buf, buf_size, + oob, oob_size); + } } -ssize_t -nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, +static ssize_t +nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { ssize_t res; -- cgit From bf647588ff781e606651f001b53a4e83bb34c000 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:06 +0300 Subject: Adding a reference counter to the libunit port structure. The goal is to minimize the number of (pid, id) to port hash lookups which require a library mutex lock. The response port is found once per request, while the read port is initialized at startup. --- src/nxt_unit.c | 678 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 336 insertions(+), 342 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 8c964c7a..ddfd9c80 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -70,7 +70,7 @@ static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); -static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, +static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, nxt_unit_mmap_buf_t *mmap_buf, int last); static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); @@ -84,17 +84,16 @@ static nxt_unit_mmap_buf_t *nxt_unit_request_preread( static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, - nxt_chunk_id_t *c, int *n, int min_n); -static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); + nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); +static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); -static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, + nxt_unit_port_t *port, int n); +static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, + nxt_unit_port_t *port, uint32_t size, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); @@ -121,34 +120,36 @@ static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); -static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, int *fd); +static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); -static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *new_port, int fd); +static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, + nxt_unit_port_t *port); -static int nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); -static int nxt_unit_remove_port(nxt_unit_impl_t *lib, +nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); +nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); +nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port); +static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id); +static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); -static int nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, - nxt_unit_port_id_t *port_id, nxt_unit_port_t **r_port, - nxt_unit_process_t **process); static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); static void nxt_unit_quit(nxt_unit_ctx_t *ctx); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, + nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, +static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); -static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, +static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, @@ -166,7 +167,6 @@ struct nxt_unit_mmap_buf_s { nxt_unit_mmap_buf_t **prev; nxt_port_mmap_header_t *hdr; - nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_process_t *process; @@ -247,8 +247,7 @@ struct nxt_unit_ctx_impl_s { pthread_mutex_t mutex; - nxt_unit_port_id_t read_port_id; - int read_port_fd; + nxt_unit_port_t *read_port; nxt_queue_link_t link; @@ -291,7 +290,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_port_id_t router_port_id; + nxt_unit_port_t *router_port; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -306,6 +305,8 @@ struct nxt_unit_impl_s { struct nxt_unit_port_impl_s { nxt_unit_port_t port; + nxt_atomic_t use_count; + nxt_queue_link_t link; nxt_unit_process_t *process; }; @@ -395,26 +396,23 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->pid = read_port.id.pid; + ctx = &lib->main_ctx.ctx; - rc = nxt_unit_add_port(ctx, &router_port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + lib->router_port = nxt_unit_add_port(ctx, &router_port); + if (nxt_slow_path(lib->router_port == NULL)) { nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - lib->router_port_id = router_port.id; - - rc = nxt_unit_add_port(ctx, &read_port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port); + if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { nxt_unit_alert(NULL, "failed to add read_port"); goto fail; } - lib->main_ctx.read_port_id = read_port.id; - rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); @@ -428,7 +426,7 @@ nxt_unit_init(nxt_unit_init_t *init) fail: - free(lib); + nxt_unit_ctx_release(&lib->main_ctx); return NULL; } @@ -471,6 +469,7 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); lib->use_count = 0; + lib->router_port = NULL; rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -485,10 +484,6 @@ nxt_unit_create(nxt_unit_init_t *init) goto fail; } - if (cb->port_recv == NULL) { - cb->port_recv = nxt_unit_port_recv_default; - } - return lib; fail: @@ -539,7 +534,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; - ctx_impl->read_port_fd = -1; + ctx_impl->read_port = NULL; ctx_impl->requests.slot = 0; return NXT_UNIT_OK; @@ -597,6 +592,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) pthread_mutex_destroy(&lib->mutex); + if (nxt_fast_path(lib->router_port != NULL)) { + nxt_unit_port_release(lib->router_port); + } + free(lib); } } @@ -751,7 +750,7 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, void *buf, size_t buf_size, void *oob, size_t oob_size) { int rc; @@ -917,7 +916,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; - nxt_unit_port_t new_port; + nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { @@ -960,7 +959,14 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->fd = -1; - return nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port); + if (nxt_slow_path(port == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_port_release(port); + + return NXT_UNIT_OK; } @@ -968,6 +974,8 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_port_id_t port_id; nxt_unit_request_t *r; nxt_unit_mmap_buf_t *b; nxt_unit_request_info_t *req; @@ -996,10 +1004,27 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } + nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + pthread_mutex_lock(&lib->mutex); + + port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0); + + pthread_mutex_unlock(&lib->mutex); + + if (nxt_slow_path(port == NULL)) { + nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found", + recv_msg->stream, + (int) recv_msg->pid, (int) recv_msg->reply_port); + + return NXT_UNIT_ERROR; + } + req = &req_impl->req; - nxt_unit_port_id_init(&req->response_port, recv_msg->pid, - recv_msg->reply_port); + req->response_port = port; req->request = recv_msg->start; @@ -1051,8 +1076,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - lib->callbacks.request_handler(req); return NXT_UNIT_OK; @@ -1275,6 +1298,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req_impl->process = NULL; } + if (req->response_port != NULL) { + nxt_unit_port_release(req->response_port); + + req->response_port = NULL; + } + pthread_mutex_lock(&ctx_impl->mutex); nxt_queue_remove(&req_impl->link); @@ -1793,7 +1822,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); if (nxt_fast_path(rc == NXT_UNIT_OK)) { req->response = NULL; req->response_buf = NULL; @@ -1846,8 +1875,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, size, size, mmap_buf, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, + size, size, mmap_buf, NULL); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -2035,7 +2064,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -2050,17 +2079,15 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) static void nxt_unit_buf_send_done(nxt_unit_buf_t *buf) { - int rc; - nxt_unit_mmap_buf_t *mmap_buf; - nxt_unit_request_info_t *req; - nxt_unit_request_info_impl_t *req_impl; + int rc; + nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_request_info_t *req; mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); req = mmap_buf->req; - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { nxt_unit_mmap_buf_free(mmap_buf); @@ -2073,7 +2100,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) static int -nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, +nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, nxt_unit_mmap_buf_t *mmap_buf, int last) { struct { @@ -2081,22 +2108,24 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_port_mmap_msg_t mmap_msg; } m; - int rc; - u_char *last_used, *first_free; - ssize_t res; - nxt_chunk_id_t first_free_chunk; - nxt_unit_buf_t *buf; - nxt_unit_impl_t *lib; - nxt_port_mmap_header_t *hdr; + int rc; + u_char *last_used, *first_free; + ssize_t res; + nxt_chunk_id_t first_free_chunk; + nxt_unit_buf_t *buf; + nxt_unit_impl_t *lib; + nxt_port_mmap_header_t *hdr; + nxt_unit_request_info_impl_t *req_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); buf = &mmap_buf->buf; hdr = mmap_buf->hdr; m.mmap_msg.size = buf->free - buf->start; - m.msg.stream = stream; + m.msg.stream = req_impl->stream; m.msg.pid = lib->pid; m.msg.reply_port = 0; m.msg.type = _NXT_PORT_MSG_DATA; @@ -2113,13 +2142,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); - nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", - stream, + nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", + req_impl->stream, (int) m.mmap_msg.mmap_id, (int) m.mmap_msg.chunk_id, (int) m.mmap_msg.size); - res = nxt_unit_port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), NULL, 0); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; @@ -2149,7 +2178,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, (int) m.mmap_msg.chunk_id - (int) first_free_chunk); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", + nxt_unit_debug(req->ctx, "process %d allocated_chunks %d", mmap_buf->process->pid, (int) mmap_buf->process->outgoing.allocated_chunks); @@ -2157,19 +2186,21 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, if (nxt_slow_path(mmap_buf->plain_ptr == NULL || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) { - nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" - ": no space reserved for message header", stream); + nxt_unit_alert(req->ctx, + "#%"PRIu32": failed to send plain memory buffer" + ": no space reserved for message header", + req_impl->stream); goto free_buf; } memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); - nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", - stream, + nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", + req_impl->stream, (int) (sizeof(m.msg) + m.mmap_msg.size)); - res = nxt_unit_port_send(ctx, &mmap_buf->port_id, + res = nxt_unit_port_send(req->ctx, req->response_port, buf->start - sizeof(m.msg), m.mmap_msg.size + sizeof(m.msg), NULL, 0); @@ -2337,7 +2368,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, sent = 0; if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { - nxt_unit_req_warn(req, "write: response not initialized yet"); + nxt_unit_req_alert(req, "write: response not initialized yet"); return -NXT_UNIT_ERROR; } @@ -2369,8 +2400,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, min_part_size = nxt_min(min_size, part_size); min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, part_size, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, min_part_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return -rc; @@ -2385,7 +2415,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return -rc; } @@ -2415,8 +2445,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_alert(req, "write: response not initialized yet"); + + return NXT_UNIT_ERROR; + } + /* Check if response is not send yet. */ - if (nxt_slow_path(req->response_buf)) { + if (nxt_slow_path(req->response_buf != NULL)) { /* Enable content in headers buf. */ rc = nxt_unit_response_add_content(req, "", 0); @@ -2463,8 +2499,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, buf_size, buf_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2486,7 +2521,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf->free += n; } - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "Failed to send content"); @@ -2744,7 +2779,7 @@ skip_response_send: msg.mf = 0; msg.tracking = 0; - (void) nxt_unit_port_send(req->ctx, &req->response_port, + (void) nxt_unit_port_send(req->ctx, req->response_port, &msg, sizeof(msg), NULL, 0); nxt_unit_request_info_release(req); @@ -2765,17 +2800,14 @@ int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, uint8_t last, const struct iovec *iov, int iovcnt) { - int i, rc; - size_t l, copy; - uint32_t payload_len, buf_size, alloc_size; - const uint8_t *b; - nxt_unit_buf_t *buf; - nxt_unit_mmap_buf_t mmap_buf; - nxt_websocket_header_t *wh; - nxt_unit_request_info_impl_t *req_impl; - char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; - - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size, alloc_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_websocket_header_t *wh; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; payload_len = 0; @@ -2786,8 +2818,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, buf_size = 10 + payload_len; alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2821,8 +2852,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, if (l > 0) { if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; @@ -2831,8 +2861,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2845,8 +2874,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, } if (buf->free > buf->start) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); } return rc; @@ -2919,17 +2947,26 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) static nxt_port_mmap_header_t * -nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) +nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_chunk_id_t *c, int *n, int min_n) { int res, nchunks, i; uint32_t outgoing_size; nxt_unit_mmap_t *mm, *mm_end; nxt_unit_impl_t *lib; + nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + process = nxt_unit_port_process(port); + if (nxt_slow_path(process == NULL)) { + nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed", + (int) port->id.pid, (int) port->id.id); + + return NULL; + } + pthread_mutex_lock(&process->outgoing.mutex); retry: @@ -2941,7 +2978,7 @@ retry: for (mm = process->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { + if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { continue; } @@ -3000,7 +3037,7 @@ retry: /* Notify router about OOSM condition. */ - res = nxt_unit_send_oosm(ctx, port_id); + res = nxt_unit_send_oosm(ctx, port); if (nxt_slow_path(res != NXT_UNIT_OK)) { return NULL; } @@ -3026,7 +3063,7 @@ retry: } *c = 0; - hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); + hdr = nxt_unit_new_mmap(ctx, port, *n); unlock: @@ -3043,7 +3080,7 @@ unlock: static int -nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { ssize_t res; nxt_port_msg_t msg; @@ -3061,7 +3098,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3163,21 +3200,29 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) static nxt_port_mmap_header_t * -nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, int n) +nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) { int i, fd, rc; void *mem; char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; + nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; - lib = process->lib; + process = nxt_unit_port_process(port); + if (nxt_slow_path(process == NULL)) { + nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed", + (int) port->id.pid, (int) port->id.id); + + return NULL; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); if (nxt_slow_path(mm == NULL)) { - nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); + nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); return NULL; } @@ -3255,7 +3300,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, hdr->id = process->outgoing.size - 1; hdr->src_pid = lib->pid; hdr->dst_pid = process->pid; - hdr->sent_over = port_id->id; + hdr->sent_over = port->id.id; /* Mark first n chunk(s) as busy */ for (i = 0; i < n; i++) { @@ -3268,7 +3313,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, pthread_mutex_unlock(&process->outgoing.mutex); - rc = nxt_unit_send_mmap(ctx, port_id, fd); + rc = nxt_unit_send_mmap(ctx, port, fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { munmap(mem, PORT_MMAP_SIZE); hdr = NULL; @@ -3295,7 +3340,7 @@ remove_fail: static int -nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) +nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { ssize_t res; nxt_port_msg_t msg; @@ -3339,7 +3384,7 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = nxt_unit_port_send(ctx, port_id, &msg, sizeof(msg), + res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &cmsg, sizeof(cmsg)); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; @@ -3350,8 +3395,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) static int -nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, +nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + uint32_t size, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) { int nchunks, min_nchunks; @@ -3376,8 +3421,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->port_id = *port_id; - mmap_buf->process = process; + mmap_buf->process = nxt_unit_port_process(port); nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", mmap_buf->buf.start, (int) size); @@ -3388,7 +3432,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; - hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); + hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); if (nxt_slow_path(hdr == NULL)) { if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { mmap_buf->hdr = NULL; @@ -3407,8 +3451,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; - mmap_buf->port_id = *port_id; - mmap_buf->process = process; + mmap_buf->process = nxt_unit_port_process(port); mmap_buf->free_ptr = NULL; mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); @@ -3770,15 +3813,12 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) { - ssize_t res; - nxt_port_msg_t msg; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t port_id; + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_port_id_init(&port_id, pid, 0); - msg.stream = 0; msg.pid = lib->pid; msg.reply_port = 0; @@ -3789,7 +3829,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3893,7 +3933,6 @@ static nxt_unit_process_t * nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) { int rc; - nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; nxt_unit_process_lhq_pid(&lhq, &pid); @@ -3906,13 +3945,11 @@ nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) } if (rc == NXT_OK) { - process = lhq.value; - if (!remove) { - nxt_unit_process_use(process); + nxt_unit_process_use(lhq.value); } - return process; + return lhq.value; } return NULL; @@ -3990,7 +4027,7 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, + rc = nxt_unit_process_msg(ctx, rbuf->buf, rbuf->size, rbuf->oob, sizeof(rbuf->oob)); @@ -4013,25 +4050,15 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { - nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - if (ctx_impl->read_port_fd != -1) { - rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); - - } else { - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); - } + rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); } @@ -4049,52 +4076,49 @@ nxt_unit_done(nxt_unit_ctx_t *ctx) nxt_unit_ctx_t * nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) { - int rc, fd; + int rc; nxt_unit_impl_t *lib; - nxt_unit_port_id_t new_port_id; + nxt_unit_port_t *port; nxt_unit_ctx_impl_t *new_ctx; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); if (nxt_slow_path(new_ctx == NULL)) { - nxt_unit_warn(ctx, "failed to allocate context"); + nxt_unit_alert(ctx, "failed to allocate context"); return NULL; } - rc = nxt_unit_create_port(ctx, &new_port_id, &fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + port = nxt_unit_create_port(ctx); + if (nxt_slow_path(port == NULL)) { free(new_ctx); return NULL; } - rc = nxt_unit_send_port(ctx, &lib->router_port_id, &new_port_id, fd); + rc = nxt_unit_send_port(ctx, lib->router_port, port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_remove_port(lib, &new_port_id); - - close(fd); - - free(new_ctx); - - return NULL; + goto fail; } - close(fd); - rc = nxt_unit_ctx_init(lib, new_ctx, data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_remove_port(lib, &new_port_id); - - free(new_ctx); - - return NULL; + goto fail; } - new_ctx->read_port_id = new_port_id; + new_ctx->read_port = port; return &new_ctx->ctx; + +fail: + + nxt_unit_remove_port(lib, &port->id); + nxt_unit_port_release(port); + + free(new_ctx); + + return NULL; } @@ -4144,6 +4168,10 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) nxt_queue_remove(&ctx_impl->link); + if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_port_release(ctx_impl->read_port); + } + if (ctx_impl != &lib->main_ctx) { free(ctx_impl); } @@ -4174,12 +4202,12 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) } -static int -nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) +static nxt_unit_port_t * +nxt_unit_create_port(nxt_unit_ctx_t *ctx) { int rc, port_sockets[2]; nxt_unit_impl_t *lib; - nxt_unit_port_t new_port; + nxt_unit_port_t new_port, *port; nxt_unit_process_t *process; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4189,7 +4217,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", strerror(errno), errno); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", @@ -4204,39 +4232,34 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) close(port_sockets[0]); close(port_sockets[1]); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); new_port.in_fd = port_sockets[0]; - new_port.out_fd = -1; + new_port.out_fd = port_sockets[1]; new_port.data = NULL; pthread_mutex_unlock(&lib->mutex); nxt_unit_process_release(process); - rc = nxt_unit_add_port(ctx, &new_port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_warn(ctx, "create_port: add_port() failed"); + port = nxt_unit_add_port(ctx, &new_port); + if (nxt_slow_path(port == NULL)) { + nxt_unit_alert(ctx, "create_port: add_port() failed"); close(port_sockets[0]); close(port_sockets[1]); - - return rc; } - *port_id = new_port.id; - *fd = port_sockets[1]; - - return rc; + return port; } static int -nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *new_port, int fd) +nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, + nxt_unit_port_t *port) { ssize_t res; nxt_unit_impl_t *lib; @@ -4263,8 +4286,8 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, m.msg.mf = 0; m.msg.tracking = 0; - m.new_port.id = new_port->id; - m.new_port.pid = new_port->pid; + m.new_port.id = port->id.id; + m.new_port.pid = port->id.pid; m.new_port.type = NXT_PROCESS_APP; m.new_port.max_size = 16 * 1024; m.new_port.max_share = 64 * 1024; @@ -4284,7 +4307,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, * Fortunately, GCC with -O1 compiles this nxt_memcpy() * in the same simple assignment as in the code above. */ - memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); + memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int)); res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); @@ -4292,13 +4315,67 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, } -static int +nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + nxt_atomic_fetch_add(&port_impl->use_count, 1); +} + + +nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) +{ + long c; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + c = nxt_atomic_fetch_add(&port_impl->use_count, -1); + + if (c == 1) { + nxt_unit_debug(NULL, "destroy port %d,%d", + (int) port->id.pid, (int) port->id.id); + + nxt_unit_process_release(port_impl->process); + + if (port->in_fd != -1) { + close(port->in_fd); + + port->in_fd = -1; + } + + if (port->out_fd != -1) { + close(port->out_fd); + + port->out_fd = -1; + } + + free(port_impl); + } +} + + +nxt_inline nxt_unit_process_t * +nxt_unit_port_process(nxt_unit_port_t *port) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + return port_impl->process; +} + + +static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port, *old_port; + nxt_unit_port_impl_t *new_port; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4311,13 +4388,13 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->id.pid, port->id.id, port->in_fd, port->out_fd); - if (old_port->port.data == NULL) { - old_port->port.data = port->data; + if (old_port->data == NULL) { + old_port->data = port->data; port->data = NULL; } - if (old_port->port.in_fd == -1) { - old_port->port.in_fd = port->in_fd; + if (old_port->in_fd == -1) { + old_port->in_fd = port->in_fd; port->in_fd = -1; } @@ -4326,8 +4403,8 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->in_fd = -1; } - if (old_port->port.out_fd == -1) { - old_port->port.out_fd = port->out_fd; + if (old_port->out_fd == -1) { + old_port->out_fd = port->out_fd; port->out_fd = -1; } @@ -4336,26 +4413,27 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) port->out_fd = -1; } - *port = old_port->port; + *port = *old_port; pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.add_port != NULL && (port->in_fd != -1 || port->out_fd != -1)) { - lib->callbacks.add_port(ctx, &old_port->port); + lib->callbacks.add_port(ctx, old_port); } - return NXT_UNIT_OK; + return old_port; } + new_port = NULL; + nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", port->id.pid, port->id.id, port->in_fd, port->out_fd); process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { - rc = NXT_UNIT_ERROR; goto unlock; } @@ -4365,7 +4443,6 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = malloc(sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(new_port == NULL)) { - rc = NXT_UNIT_ERROR; goto unlock; } @@ -4376,107 +4453,85 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", port->id.pid, port->id.id); + free(new_port); + + new_port = NULL; + goto unlock; } nxt_queue_insert_tail(&process->ports, &new_port->link); - rc = NXT_UNIT_OK; - + new_port->use_count = 2; new_port->process = process; + process = NULL; + unlock: pthread_mutex_unlock(&lib->mutex); - if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { + if (nxt_slow_path(process != NULL)) { nxt_unit_process_release(process); } if (lib->callbacks.add_port != NULL - && rc == NXT_UNIT_OK + && new_port != NULL && (port->in_fd != -1 || port->out_fd != -1)) { lib->callbacks.add_port(ctx, &new_port->port); } - return rc; + return &new_port->port; } -static int +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - int res; - nxt_unit_port_t *port; - nxt_unit_process_t *process; - - port = NULL; - process = NULL; + nxt_unit_port_t *port; + nxt_unit_port_impl_t *port_impl; pthread_mutex_lock(&lib->mutex); - res = nxt_unit_remove_port_unsafe(lib, port_id, &port, &process); - - pthread_mutex_unlock(&lib->mutex); - - if (lib->callbacks.remove_port != NULL && res == NXT_UNIT_OK) { - lib->callbacks.remove_port(&lib->unit, port); - } + port = nxt_unit_remove_port_unsafe(lib, port_id); if (nxt_fast_path(port != NULL)) { - if (port->in_fd != -1) { - close(port->in_fd); - } + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); - if (port->out_fd != -1) { - close(port->out_fd); - } + nxt_queue_remove(&port_impl->link); } - if (nxt_slow_path(process != NULL)) { - nxt_unit_process_release(process); + pthread_mutex_unlock(&lib->mutex); + + if (lib->callbacks.remove_port != NULL && port != NULL) { + lib->callbacks.remove_port(&lib->unit, port); } if (nxt_fast_path(port != NULL)) { - free(port); + nxt_unit_port_release(port); } - - return res; } -static int -nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id, - nxt_unit_port_t **r_port, nxt_unit_process_t **process) +static nxt_unit_port_t * +nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - nxt_unit_port_impl_t *port; + nxt_unit_port_t *port; port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { nxt_unit_debug(NULL, "remove_port: port %d,%d not found", (int) port_id->pid, (int) port_id->id); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, - port->port.in_fd, port->port.out_fd, port->port.data); - - if (port->process != NULL) { - nxt_queue_remove(&port->link); - } + port->in_fd, port->out_fd, port->data); - if (process != NULL) { - *process = port->process; - } - - if (r_port != NULL) { - *r_port = &port->port; - } - - return NXT_UNIT_OK; + return port; } @@ -4516,12 +4571,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { - nxt_unit_process_release(process); - - /* To avoid unlink port. */ - port->process = NULL; - - nxt_unit_remove_port_unsafe(lib, &port->port.id, NULL, NULL); + nxt_unit_remove_port_unsafe(lib, &port->port.id); } nxt_queue_loop; @@ -4535,15 +4585,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) lib->callbacks.remove_port(&lib->unit, &port->port); } - if (port->port.in_fd != -1) { - close(port->port.in_fd); - } - - if (port->port.out_fd != -1) { - close(port->port.out_fd); - } - - free(port); + nxt_unit_port_release(&port->port); } nxt_queue_loop; @@ -4567,43 +4609,23 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) static ssize_t -nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { - int fd; - nxt_unit_impl_t *lib; - nxt_unit_port_impl_t *port; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); - - if (nxt_fast_path(port != NULL && port->port.out_fd != -1)) { - fd = port->port.out_fd; - - pthread_mutex_unlock(&lib->mutex); - - } else { - pthread_mutex_unlock(&lib->mutex); - - nxt_unit_alert(ctx, "port_send: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - - return -NXT_UNIT_ERROR; - } + nxt_unit_impl_t *lib; - nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", - (int) port_id->pid, (int) port_id->id, fd); + nxt_unit_debug(ctx, "port_send: port %d,%d fd %d", + (int) port->id.pid, (int) port->id.id, port->out_fd); - if (lib->callbacks.port_send == NULL) { - return nxt_unit_sendmsg(ctx, fd, buf, buf_size, oob, oob_size); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - } else { - return lib->callbacks.port_send(ctx, port_id, buf, buf_size, + if (lib->callbacks.port_send != NULL) { + return lib->callbacks.port_send(ctx, port, buf, buf_size, oob, oob_size); } + + return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, + oob, oob_size); } @@ -4652,56 +4674,22 @@ retry: static ssize_t -nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int fd; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_port_impl_t *port; + int fd; + ssize_t res; + struct iovec iov[1]; + struct msghdr msg; + nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); - - if (nxt_fast_path(port != NULL)) { - fd = port->port.in_fd; - - } else { - nxt_unit_debug(ctx, "port_recv: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - fd = -1; + if (lib->callbacks.port_recv != NULL) { + return lib->callbacks.port_recv(ctx, port, + buf, buf_size, oob, oob_size); } - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(fd == -1)) { - return -1; - } - - nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d", - (int) port_id->pid, (int) port_id->id, fd); - - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - - if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) { - ctx_impl->read_port_fd = fd; - } - - return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size); -} - - -ssize_t -nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, - void *oob, size_t oob_size) -{ - ssize_t res; - struct iovec iov[1]; - struct msghdr msg; - iov[0].iov_base = buf; iov[0].iov_len = buf_size; @@ -4713,6 +4701,8 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, msg.msg_control = oob; msg.msg_controllen = oob_size; + fd = port->in_fd; + retry: res = recvmsg(fd, &msg, 0); @@ -4813,7 +4803,7 @@ nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) } -static nxt_unit_port_impl_t * +static nxt_unit_port_t * nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove) { @@ -4833,6 +4823,10 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, switch (res) { case NXT_OK: + if (!remove) { + nxt_unit_port_use(lhq.value); + } + return lhq.value; default: -- cgit From 3cbc22a6dc45abdeade4deb364601230ddca02c1 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:10 +0300 Subject: Changing router to application port exchange protocol. The application process needs to request the port from the router instead of the latter pushing the port before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router to use the application shared port and then the queue. --- src/nxt_unit.c | 276 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 249 insertions(+), 27 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index ddfd9c80..c1ef977f 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); @@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); @@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s { nxt_unit_req_state_t state; uint8_t websocket; + /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; + /* for nxt_unit_port_impl_t.awaiting_req */ + nxt_queue_link_t port_wait_link; char extra_data[]; }; @@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; nxt_atomic_t use_count; + nxt_atomic_t wait_items; pthread_mutex_t mutex; @@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t ready_req; + nxt_unit_read_buf_t *pending_read_head; nxt_unit_read_buf_t **pending_read_tail; nxt_unit_read_buf_t *free_read_buf; @@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s { nxt_queue_link_t link; nxt_unit_process_t *process; + + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t awaiting_req; + + int ready; }; @@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); ctx_impl->use_count = 1; + ctx_impl->wait_items = 0; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); + nxt_queue_init(&ctx_impl->ready_req); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); @@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { + int res; nxt_unit_impl_t *lib; - nxt_unit_port_t *port; nxt_unit_port_id_t port_id; nxt_unit_request_t *r; nxt_unit_mmap_buf_t *b; @@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(port == NULL)) { - nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found", - recv_msg->stream, - (int) recv_msg->pid, (int) recv_msg->reply_port); - - return NXT_UNIT_ERROR; - } - req = &req_impl->req; - req->response_port = port; - req->request = recv_msg->start; b = recv_msg->incoming_buf; @@ -1076,12 +1074,129 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); - lib->callbacks.request_handler(req); + nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); + + res = nxt_unit_request_check_response_port(req, &port_id); + + if (nxt_fast_path(res == NXT_UNIT_OK)) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(req); + } return NXT_UNIT_OK; } +static int +nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id) +{ + int res; + nxt_unit_ctx_t *ctx; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + nxt_unit_request_info_impl_t *req_impl; + + ctx = req->ctx; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&lib->mutex); + + port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + if (nxt_fast_path(port != NULL)) { + req->response_port = port; + + if (nxt_fast_path(port_impl->ready)) { + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", + (int) port->id.pid, (int) port->id.id); + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "check_response_port: " + "port{%d,%d} already requested", + (int) port->id.pid, (int) port->id.id); + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&lib->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; + } + + port_impl = malloc(sizeof(nxt_unit_port_impl_t)); + if (nxt_slow_path(port_impl == NULL)) { + nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", + (int) sizeof(nxt_unit_port_impl_t)); + + pthread_mutex_unlock(&lib->mutex); + + return NXT_UNIT_ERROR; + } + + port = &port_impl->port; + + port->id = *port_id; + port->in_fd = -1; + port->out_fd = -1; + port->data = NULL; + + res = nxt_unit_port_hash_add(&lib->ports, port); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", + port->id.pid, port->id.id); + + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link); + + port_impl->process = req_impl->process; + + + nxt_queue_init(&port_impl->awaiting_req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); + + port_impl->use_count = 2; + port_impl->ready = 0; + + req->response_port = port; + + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_process_use(port_impl->process); + + res = nxt_unit_get_port(ctx, port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; +} + + static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { @@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_process_ready_req(ctx_impl); + nxt_unit_ctx_release(ctx_impl); return rc; @@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) } +static void +nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_queue_t ready_req; + nxt_unit_impl_t *lib; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_init(&ready_req); + + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->ready_req)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return; + } + + nxt_queue_add(&ready_req, &ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->ready_req); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_queue_each(req_impl, &ready_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(&req_impl->req); + + } nxt_queue_loop; +} + + void nxt_unit_done(nxt_unit_ctx_t *ctx) { @@ -4371,11 +4521,14 @@ nxt_unit_port_process(nxt_unit_port_t *port) static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_port_t *old_port; - nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port; + int rc; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *new_port, *old_port_impl; + nxt_unit_request_info_impl_t *req_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) *port = *old_port; + nxt_queue_init(&awaiting_req); + + old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } + + old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.add_port != NULL @@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) lib->callbacks.add_port(ctx, old_port); } + nxt_queue_each(req_impl, &awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + return old_port; } @@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port->use_count = 2; new_port->process = process; + new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + + nxt_queue_init(&new_port->awaiting_req); process = NULL; @@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) } +static int +nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_port_t get_port; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_PORT; + + m.get_port.id = port_id->id; + m.get_port.pid = port_id->pid; + + nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, + (int) port_id->id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) -- cgit From 6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:13 +0300 Subject: Changing router to application shared memory exchange protocol. The application process needs to request the shared memory segment from the router instead of the latter pushing the segment before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router for using the application shared port and then the queue. --- src/nxt_unit.c | 377 ++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 269 insertions(+), 108 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index c1ef977f..b321a0d4 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -51,6 +51,7 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, @@ -103,12 +104,14 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, uint32_t id); static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, + nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, + nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); @@ -240,7 +243,8 @@ struct nxt_unit_websocket_frame_impl_s { struct nxt_unit_read_buf_s { - nxt_unit_read_buf_t *next; + nxt_queue_link_t link; + nxt_unit_ctx_impl_t *ctx_impl; ssize_t size; char buf[16384]; char oob[256]; @@ -276,9 +280,11 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_queue_t ready_req; - nxt_unit_read_buf_t *pending_read_head; - nxt_unit_read_buf_t **pending_read_tail; - nxt_unit_read_buf_t *free_read_buf; + /* of nxt_unit_read_buf_t */ + nxt_queue_t pending_rbuf; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t free_rbuf; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -318,6 +324,7 @@ struct nxt_unit_port_impl_s { nxt_atomic_t use_count; + /* for nxt_unit_process_t.ports */ nxt_queue_link_t link; nxt_unit_process_t *process; @@ -330,6 +337,9 @@ struct nxt_unit_port_impl_s { struct nxt_unit_mmap_s { nxt_port_mmap_header_t *hdr; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t awaiting_rbuf; }; @@ -345,7 +355,7 @@ struct nxt_unit_mmaps_s { struct nxt_unit_process_s { pid_t pid; - nxt_queue_t ports; + nxt_queue_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_mmaps_t incoming; nxt_unit_mmaps_t outgoing; @@ -537,17 +547,17 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); nxt_queue_init(&ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->free_rbuf); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); - ctx_impl->pending_read_head = NULL; - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; - ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; @@ -767,9 +777,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) } -int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, - void *buf, size_t buf_size, void *oob, size_t oob_size) +static int +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { int rc; pid_t pid; @@ -783,11 +792,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, rc = NXT_UNIT_ERROR; recv_msg.fd = -1; recv_msg.process = NULL; - port_msg = buf; - cm = oob; + port_msg = (nxt_port_msg_t *) rbuf->buf; + cm = (struct cmsghdr *) rbuf->oob; - if (oob_size >= CMSG_SPACE(sizeof(int)) - && cm->cmsg_len == CMSG_LEN(sizeof(int)) + if (cm->cmsg_len == CMSG_LEN(sizeof(int)) && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { @@ -796,8 +804,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.incoming_buf = NULL; - if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { - nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); goto fail; } @@ -808,7 +816,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.mmap = port_msg->mmap; recv_msg.start = port_msg + 1; - recv_msg.size = buf_size - sizeof(nxt_port_msg_t); + recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", @@ -816,10 +824,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, goto fail; } - if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { - rc = NXT_UNIT_OK; + if (port_msg->tracking) { + rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); - goto fail; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + + goto fail; + } } /* Fragmentation is unsupported. */ @@ -830,7 +844,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, } if (port_msg->mmap) { - if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { + rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + goto fail; } } @@ -1077,6 +1097,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); res = nxt_unit_request_check_response_port(req, &port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } if (nxt_fast_path(res == NXT_UNIT_OK)) { lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -2376,33 +2399,41 @@ static nxt_unit_read_buf_t * nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) { nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); - return nxt_unit_read_buf_get_impl(ctx_impl); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + + return rbuf; } static nxt_unit_read_buf_t * nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) { + nxt_queue_link_t *link; nxt_unit_read_buf_t *rbuf; - if (ctx_impl->free_read_buf != NULL) { - rbuf = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { + link = nxt_queue_first(&ctx_impl->free_rbuf); + nxt_queue_remove(link); - pthread_mutex_unlock(&ctx_impl->mutex); + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); return rbuf; } - pthread_mutex_unlock(&ctx_impl->mutex); - rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + if (nxt_fast_path(rbuf != NULL)) { + rbuf->ctx_impl = ctx_impl; + } + return rbuf; } @@ -2417,8 +2448,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, pthread_mutex_lock(&ctx_impl->mutex); - rbuf->next = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf; + nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); } @@ -3255,9 +3285,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - *ctx_impl->pending_read_tail = rbuf; - ctx_impl->pending_read_tail = &rbuf->next; - rbuf->next = NULL; + nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); @@ -3275,7 +3303,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { - uint32_t cap; + uint32_t cap, n; + nxt_unit_mmap_t *e; + + if (nxt_fast_path(mmaps->size > i)) { + return mmaps->elts + i; + } cap = mmaps->cap; @@ -3295,13 +3328,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) if (cap != mmaps->cap) { - mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); - if (nxt_slow_path(mmaps->elts == NULL)) { + e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); + if (nxt_slow_path(e == NULL)) { return NULL; } - memset(mmaps->elts + mmaps->cap, 0, - sizeof(*mmaps->elts) * (cap - mmaps->cap)); + mmaps->elts = e; + + for (n = mmaps->cap; n < cap; n++) { + e = mmaps->elts + n; + + e->hdr = NULL; + nxt_queue_init(&e->awaiting_rbuf); + } mmaps->cap = cap; } @@ -3581,13 +3620,16 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) { - int rc; - void *mem; - struct stat mmap_stat; - nxt_unit_mmap_t *mm; - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - nxt_port_mmap_header_t *hdr; + int rc; + void *mem; + nxt_queue_t awaiting_rbuf; + struct stat mmap_stat; + nxt_unit_mmap_t *mm; + nxt_unit_impl_t *lib; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -3626,7 +3668,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr = mem; - if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { + if (nxt_slow_path(hdr->src_pid != pid)) { nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " "detected: %d != %d or %d != %d", (int) hdr->src_pid, @@ -3637,6 +3679,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) goto fail; } + nxt_queue_init(&awaiting_rbuf); + pthread_mutex_lock(&process->incoming.mutex); mm = nxt_unit_mmap_at(&process->incoming, hdr->id); @@ -3650,11 +3694,28 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr->sent_over = 0xFFFFu; + nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); + nxt_queue_init(&mm->awaiting_rbuf); + rc = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); + nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { + + ctx_impl = rbuf->ctx_impl; + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + fail: nxt_unit_process_release(process); @@ -3719,27 +3780,11 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) } -static nxt_port_mmap_header_t * -nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - uint32_t id) -{ - nxt_port_mmap_header_t *hdr; - - if (nxt_fast_path(process->incoming.size > id)) { - hdr = process->incoming.elts[id].hdr; - - } else { - hdr = NULL; - } - - return hdr; -} - - static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) { - int rc; + int res; nxt_chunk_id_t c; nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; @@ -3749,7 +3794,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", recv_msg->stream, (int) recv_msg->size); - return 0; + return NXT_UNIT_ERROR; } tracking_msg = recv_msg->start; @@ -3759,44 +3804,95 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) process = nxt_unit_msg_get_process(ctx, recv_msg); if (nxt_slow_path(process == NULL)) { - return 0; + return NXT_UNIT_ERROR; } pthread_mutex_lock(&process->incoming.mutex); - hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); - - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - tracking_msg->mmap_id); + res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming, + recv_msg->pid, tracking_msg->mmap_id, + &hdr, rbuf); - return 0; + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return res; } c = tracking_msg->tracking_id; - rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); + res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); - if (rc == 0) { + if (res == 0) { nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", recv_msg->stream); nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + + res = NXT_UNIT_CANCELLED; + + } else { + res = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); - return rc; + return res; } static int -nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, + pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, + nxt_unit_read_buf_t *rbuf) { + int res, need_rbuf; + nxt_unit_mmap_t *mm; + nxt_unit_ctx_impl_t *ctx_impl; + + mm = nxt_unit_mmap_at(mmaps, id); + if (nxt_slow_path(mm == NULL)) { + nxt_unit_alert(ctx, "failed to allocate mmap"); + + pthread_mutex_unlock(&mmaps->mutex); + + *hdr = NULL; + + return NXT_UNIT_ERROR; + } + + *hdr = mm->hdr; + + if (nxt_fast_path(*hdr != NULL)) { + return NXT_UNIT_OK; + } + + need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); + + nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); + + pthread_mutex_unlock(&mmaps->mutex); + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + if (need_rbuf) { + res = nxt_unit_get_mmap(ctx, pid, id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_AGAIN; +} + + +static int +nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) +{ + int res; void *start; uint32_t size; + nxt_unit_mmaps_t *mmaps; nxt_unit_process_t *process; nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; @@ -3819,12 +3915,17 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) incoming_tail = &recv_msg->incoming_buf; + /* Allocating buffer structures. */ for (; mmap_msg < end; mmap_msg++) { b = nxt_unit_mmap_buf_get(ctx); if (nxt_slow_path(b == NULL)) { nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", recv_msg->stream); + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } + return NXT_UNIT_ERROR; } @@ -3835,19 +3936,21 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) b = recv_msg->incoming_buf; mmap_msg = recv_msg->start; - pthread_mutex_lock(&process->incoming.mutex); + mmaps = &process->incoming; + + pthread_mutex_lock(&mmaps->mutex); for (; mmap_msg < end; mmap_msg++) { - hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); + res = nxt_unit_check_rbuf_mmap(ctx, mmaps, + recv_msg->pid, mmap_msg->mmap_id, + &hdr, rbuf); - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - mmap_msg->mmap_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } - return NXT_UNIT_ERROR; + return res; } start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); @@ -3874,7 +3977,41 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (int) mmap_msg->size); } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&mmaps->mutex); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_mmap_t get_mmap; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_MMAP; + + m.get_mmap.id = id; + + nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } return NXT_UNIT_OK; } @@ -4110,6 +4247,7 @@ int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; + nxt_queue_link_t *link; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -4119,18 +4257,22 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - if (ctx_impl->pending_read_head != NULL) { - rbuf = ctx_impl->pending_read_head; - ctx_impl->pending_read_head = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { - if (ctx_impl->pending_read_tail == &rbuf->next) { - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - } +next_pending: + + link = nxt_queue_first(&ctx_impl->pending_rbuf); + nxt_queue_remove(link); + + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); pthread_mutex_unlock(&ctx_impl->mutex); } else { rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + if (nxt_slow_path(rbuf == NULL)) { nxt_unit_ctx_release(ctx_impl); @@ -4142,21 +4284,40 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, - rbuf->buf, rbuf->size, - rbuf->oob, sizeof(rbuf->oob)); + rc = nxt_unit_process_msg(ctx, rbuf); #if (NXT_DEBUG) - memset(rbuf->buf, 0xAC, rbuf->size); + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { + memset(rbuf->buf, 0xAC, rbuf->size); + } #endif } else { rc = NXT_UNIT_ERROR; } - nxt_unit_read_buf_release(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { + rc = NXT_UNIT_OK; + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { + rc = NXT_UNIT_OK; + } + + if (nxt_fast_path(rc == NXT_UNIT_OK)) { + pthread_mutex_lock(&ctx_impl->mutex); - nxt_unit_process_ready_req(ctx_impl); + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + goto next_pending; + } + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_unit_process_ready_req(ctx_impl); + } nxt_unit_ctx_release(ctx_impl); -- cgit From 83595606121a821f9e3cef0f0b7e7fe87eb1e50a Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:15 +0300 Subject: Introducing the shared application port. This is the port shared between all application processes which use it to pass requests for processing. Using it significantly simplifies the request processing code in the router. The drawback is 2 more file descriptors per each configured application and more complex libunit message wait/read code. --- src/nxt_unit.c | 467 ++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 361 insertions(+), 106 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index b321a0d4..7fb2826d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -38,8 +38,8 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); -nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); -nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); +nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, @@ -58,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_port_id_t *port_id); +static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -122,9 +123,12 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); -static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, - nxt_unit_read_buf_t *rbuf); -static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); +static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); +static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); +static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); +static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); @@ -150,9 +154,8 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx, - nxt_unit_port_t *port, void *buf, size_t buf_size, - void *oob, size_t oob_size); +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); @@ -308,6 +311,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_port_t *router_port; + nxt_unit_port_t *shared_port; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -452,7 +456,7 @@ nxt_unit_init(nxt_unit_init_t *init) fail: - nxt_unit_ctx_release(&lib->main_ctx); + nxt_unit_ctx_release(&lib->main_ctx.ctx); return NULL; } @@ -496,6 +500,7 @@ nxt_unit_create(nxt_unit_init_t *init) lib->use_count = 0; lib->router_port = NULL; + lib->shared_port = NULL; rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -570,16 +575,23 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_inline void -nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) { + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_atomic_fetch_add(&ctx_impl->use_count, 1); } nxt_inline void -nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) { - long c; + long c; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); @@ -624,6 +636,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) nxt_unit_port_release(lib->router_port); } + if (nxt_fast_path(lib->shared_port != NULL)) { + nxt_unit_port_release(lib->shared_port); + } + free(lib); } } @@ -805,6 +821,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) recv_msg.incoming_buf = NULL; if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + if (nxt_slow_path(rbuf->size == 0)) { + nxt_unit_debug(ctx, "read port closed"); + + nxt_unit_quit(ctx); + rc = NXT_UNIT_OK; + + goto fail; + } + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); goto fail; } @@ -946,6 +971,13 @@ fail: nxt_unit_process_release(recv_msg.process); } + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { +#if (NXT_DEBUG) + memset(rbuf->buf, 0xAC, rbuf->size); +#endif + nxt_unit_read_buf_release(ctx, rbuf); + } + return rc; } @@ -954,6 +986,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; + nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -978,21 +1011,33 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->stream, (int) new_port_msg->pid, (int) new_port_msg->id, recv_msg->fd); - nb = 0; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (new_port_msg->id == (nxt_port_id_t) -1) { + nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); - if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { - nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " - "failed: %s (%d)", - recv_msg->stream, recv_msg->fd, strerror(errno), errno); + new_port.in_fd = recv_msg->fd; + new_port.out_fd = -1; - return NXT_UNIT_ERROR; + } else { + nb = 0; + + if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " + "failed: %s (%d)", + recv_msg->stream, recv_msg->fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, + new_port_msg->id); + + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd; } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); - new_port.in_fd = -1; - new_port.out_fd = recv_msg->fd; new_port.data = NULL; recv_msg->fd = -1; @@ -1002,7 +1047,12 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - nxt_unit_port_release(port); + if (new_port_msg->id == (nxt_port_id_t) -1) { + lib->shared_port = port; + + } else { + nxt_unit_port_release(port); + } return NXT_UNIT_OK; } @@ -1102,6 +1152,11 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } if (nxt_fast_path(res == NXT_UNIT_OK)) { + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return res; + } + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib->callbacks.request_handler(req); @@ -1220,6 +1275,36 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, } +static int +nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.stream = req_impl->stream; + msg.pid = lib->pid; + msg.reply_port = ctx_impl->read_port->id.id; + msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; + + res = nxt_unit_port_send(req->ctx, req->response_port, + &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { @@ -3267,7 +3352,9 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - nxt_unit_read_buf(ctx, rbuf); + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { nxt_unit_read_buf_release(ctx, rbuf); @@ -4218,26 +4305,23 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + int rc; + nxt_unit_impl_t *lib; - nxt_unit_ctx_use(ctx_impl); + nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_run_once(ctx); + rc = nxt_unit_run_once_impl(ctx); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } } - nxt_unit_ctx_release(ctx_impl); + nxt_unit_ctx_release(ctx); return rc; } @@ -4246,109 +4330,163 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { - int rc; - nxt_queue_link_t *link; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_read_buf_t *rbuf; + int rc; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_unit_ctx_use(ctx); - nxt_unit_ctx_use(ctx_impl); + rc = nxt_unit_run_once_impl(ctx); - pthread_mutex_lock(&ctx_impl->mutex); + nxt_unit_ctx_release(ctx); - if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + return rc; +} -next_pending: - link = nxt_queue_first(&ctx_impl->pending_rbuf); - nxt_queue_remove(link); +static int +nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_read_buf_t *rbuf; - rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } - pthread_mutex_unlock(&ctx_impl->mutex); + rc = nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_read_buf_release(ctx, rbuf); - } else { - rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + return rc; + } - pthread_mutex_unlock(&ctx_impl->mutex); + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } - if (nxt_slow_path(rbuf == NULL)) { + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } - nxt_unit_ctx_release(ctx_impl); + nxt_unit_process_ready_req(ctx); - return NXT_UNIT_ERROR; - } + return rc; +} - nxt_unit_read_buf(ctx, rbuf); - } - if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, rbuf); +static int +nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +{ + int res, err; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + struct pollfd fds[2]; -#if (NXT_DEBUG) - if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { - memset(rbuf->buf, 0xAC, rbuf->size); - } -#endif + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - } else { - rc = NXT_UNIT_ERROR; + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { + return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); } - if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { - rc = NXT_UNIT_OK; +retry: - } else { - nxt_unit_read_buf_release(ctx, rbuf); - } + fds[0].fd = ctx_impl->read_port->in_fd; + fds[0].events = POLLIN; + fds[0].revents = 0; - if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { - rc = NXT_UNIT_OK; - } + fds[1].fd = lib->shared_port->in_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; - if (nxt_fast_path(rc == NXT_UNIT_OK)) { - pthread_mutex_lock(&ctx_impl->mutex); + res = poll(fds, 2, -1); + if (nxt_slow_path(res < 0)) { + err = errno; - if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { - goto next_pending; + if (err == EINTR) { + goto retry; } - pthread_mutex_unlock(&ctx_impl->mutex); + nxt_unit_alert(ctx, "poll() failed: %s (%d)", + strerror(err), err); - nxt_unit_process_ready_req(ctx_impl); + rbuf->size = -1; + + return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } - nxt_unit_ctx_release(ctx_impl); + if ((fds[0].revents & POLLIN) != 0) { + return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + } - return rc; + if ((fds[1].revents & POLLIN) != 0) { + return nxt_unit_port_recv(ctx, lib->shared_port, rbuf); + } + + rbuf->size = -1; + + return NXT_UNIT_ERROR; } -static void -nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +static int +nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) { + int rc; + nxt_queue_t pending_rbuf; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + nxt_queue_init(&pending_rbuf); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return NXT_UNIT_OK; + } + + nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->pending_rbuf); - rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); + pthread_mutex_unlock(&ctx_impl->mutex); + + rc = NXT_UNIT_OK; + + nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { + + if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { + rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + } nxt_queue_loop; + + return rc; } static void -nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { nxt_queue_t ready_req; nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; nxt_queue_init(&ready_req); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + pthread_mutex_lock(&ctx_impl->mutex); if (nxt_queue_is_empty(&ctx_impl->ready_req)) { @@ -4367,20 +4505,121 @@ nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) { lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + (void) nxt_unit_send_req_headers_ack(&req_impl->req); + lib->callbacks.request_handler(&req_impl->req); } nxt_queue_loop; } -void -nxt_unit_done(nxt_unit_ctx_t *ctx) +int +nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { + int rc; + nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - nxt_unit_ctx_release(ctx_impl); + rc = NXT_UNIT_OK; + + while (nxt_fast_path(lib->online)) { + rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + } + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +int +nxt_unit_run_shared(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_impl_t *lib; + + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + rc = NXT_UNIT_OK; + + while (nxt_fast_path(lib->online)) { + rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + } + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +int +nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + + nxt_unit_ctx_use(ctx); + + rc = nxt_unit_process_port_msg_impl(ctx, port); + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +static int +nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + nxt_unit_read_buf_t *rbuf; + + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + rc = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_read_buf_release(ctx, rbuf); + return rc; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_process_ready_req(ctx); + + return rc; +} + + +void +nxt_unit_done(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_release(ctx); } @@ -5056,12 +5295,11 @@ retry: } -static ssize_t +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, - void *buf, size_t buf_size, void *oob, size_t oob_size) + nxt_unit_read_buf_t *rbuf) { - int fd; - ssize_t res; + int fd, err; struct iovec iov[1]; struct msghdr msg; nxt_unit_impl_t *lib; @@ -5069,40 +5307,57 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (lib->callbacks.port_recv != NULL) { - return lib->callbacks.port_recv(ctx, port, - buf, buf_size, oob, oob_size); + rbuf->size = lib->callbacks.port_recv(ctx, port, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + + if (nxt_slow_path(rbuf->size < 0)) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; } - iov[0].iov_base = buf; - iov[0].iov_len = buf_size; + iov[0].iov_base = rbuf->buf; + iov[0].iov_len = sizeof(rbuf->buf); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_flags = 0; - msg.msg_control = oob; - msg.msg_controllen = oob_size; + msg.msg_control = rbuf->oob; + msg.msg_controllen = sizeof(rbuf->oob); fd = port->in_fd; retry: - res = recvmsg(fd, &msg, 0); + rbuf->size = recvmsg(fd, &msg, 0); - if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + if (nxt_slow_path(rbuf->size == -1)) { + err = errno; + + if (err == EINTR) { goto retry; } + if (err == EAGAIN) { + nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", + fd, strerror(errno), errno); + + return NXT_UNIT_AGAIN; + } + nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", fd, strerror(errno), errno); - } else { - nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); + return NXT_UNIT_ERROR; } - return res; + nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); + + return NXT_UNIT_OK; } -- cgit From 2f3d27fa22d2e5566dfdeddfb6a1f8c927a5c73d Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:17 +0300 Subject: Process structures refactoring in runtime and libunit. Generic process-to-process shared memory exchange is no more required. Here, it is transformed into a router-to-application pattern. The outgoing shared memory segments collection is now the property of the application structure. The applications connect to the router only, and the process only needs to group the ports. --- src/nxt_unit.c | 296 ++++++++++++++++++--------------------------------------- 1 file changed, 93 insertions(+), 203 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 7fb2826d..154fd480 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -70,8 +70,6 @@ static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( nxt_unit_ctx_t *ctx); static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); -static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, @@ -114,7 +112,6 @@ static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); @@ -137,7 +134,6 @@ static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); -nxt_inline nxt_unit_process_t *nxt_unit_port_process(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, @@ -179,7 +175,6 @@ struct nxt_unit_mmap_buf_s { nxt_port_mmap_header_t *hdr; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_process_t *process; char *free_ptr; char *plain_ptr; }; @@ -197,7 +192,6 @@ struct nxt_unit_recv_msg_s { uint32_t size; int fd; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t *incoming_buf; }; @@ -217,8 +211,6 @@ struct nxt_unit_request_info_impl_s { uint32_t stream; - nxt_unit_process_t *process; - nxt_unit_mmap_buf_t *outgoing_buf; nxt_unit_mmap_buf_t *incoming_buf; @@ -296,6 +288,23 @@ struct nxt_unit_ctx_impl_s { }; +struct nxt_unit_mmap_s { + nxt_port_mmap_header_t *hdr; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t awaiting_rbuf; +}; + + +struct nxt_unit_mmaps_s { + pthread_mutex_t mutex; + uint32_t size; + uint32_t cap; + nxt_atomic_t allocated_chunks; + nxt_unit_mmap_t *elts; +}; + + struct nxt_unit_impl_s { nxt_unit_t unit; nxt_unit_callbacks_t callbacks; @@ -315,6 +324,9 @@ struct nxt_unit_impl_s { nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ + nxt_unit_mmaps_t incoming; + nxt_unit_mmaps_t outgoing; + pid_t pid; int log_fd; int online; @@ -339,31 +351,11 @@ struct nxt_unit_port_impl_s { }; -struct nxt_unit_mmap_s { - nxt_port_mmap_header_t *hdr; - - /* of nxt_unit_read_buf_t */ - nxt_queue_t awaiting_rbuf; -}; - - -struct nxt_unit_mmaps_s { - pthread_mutex_t mutex; - uint32_t size; - uint32_t cap; - nxt_atomic_t allocated_chunks; - nxt_unit_mmap_t *elts; -}; - - struct nxt_unit_process_s { pid_t pid; nxt_queue_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_mmaps_t incoming; - nxt_unit_mmaps_t outgoing; - nxt_unit_impl_t *lib; nxt_atomic_t use_count; @@ -515,6 +507,9 @@ nxt_unit_create(nxt_unit_init_t *init) goto fail; } + nxt_unit_mmaps_init(&lib->incoming); + nxt_unit_mmaps_init(&lib->outgoing); + return lib; fail: @@ -640,6 +635,9 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) nxt_unit_port_release(lib->shared_port); } + nxt_unit_mmaps_destroy(&lib->incoming); + nxt_unit_mmaps_destroy(&lib->outgoing); + free(lib); } } @@ -807,7 +805,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = NXT_UNIT_ERROR; recv_msg.fd = -1; - recv_msg.process = NULL; port_msg = (nxt_port_msg_t *) rbuf->buf; cm = (struct cmsghdr *) rbuf->oob; @@ -967,10 +964,6 @@ fail: nxt_unit_mmap_buf_free(recv_msg.incoming_buf); } - if (recv_msg.process != NULL) { - nxt_unit_process_release(recv_msg.process); - } - if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { #if (NXT_DEBUG) memset(rbuf->buf, 0xAC, rbuf->size); @@ -1109,14 +1102,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req->content_buf = req->request_buf; req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); - /* "Move" process reference to req_impl. */ - req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(req_impl->process == NULL)) { - return NXT_UNIT_ERROR; - } - - recv_msg->process = NULL; - req_impl->stream = recv_msg->stream; req_impl->outgoing_buf = NULL; @@ -1174,6 +1159,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t *port; + nxt_unit_process_t *process; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_port_impl_t *port_impl; nxt_unit_request_info_impl_t *req_impl; @@ -1244,15 +1230,28 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, return NXT_UNIT_ERROR; } - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + process = nxt_unit_process_find(lib, port_id->pid, 0); + if (nxt_slow_path(process == NULL)) { + nxt_unit_alert(ctx, "check_response_port: process %d not found", + port->id.pid); + + nxt_unit_port_hash_find(&lib->ports, port_id, 1); - nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link); + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } - port_impl->process = req_impl->process; + nxt_queue_insert_tail(&process->ports, &port_impl->link); + port_impl->process = process; nxt_queue_init(&port_impl->awaiting_req); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); port_impl->use_count = 2; @@ -1262,8 +1261,6 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, pthread_mutex_unlock(&lib->mutex); - nxt_unit_process_use(port_impl->process); - res = nxt_unit_get_port(ctx, port_id); if (nxt_slow_path(res == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; @@ -1511,16 +1508,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->content_fd = -1; } - /* - * Process release should go after buffers release to guarantee mmap - * existence. - */ - if (req_impl->process != NULL) { - nxt_unit_process_release(req_impl->process); - - req_impl->process = NULL; - } - if (req->response_port != NULL) { nxt_unit_port_release(req->response_port); @@ -2111,32 +2098,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) } -static nxt_unit_process_t * -nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) -{ - nxt_unit_impl_t *lib; - - if (recv_msg->process != NULL) { - return recv_msg->process; - } - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - recv_msg->process = nxt_unit_process_find(lib, recv_msg->pid, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (recv_msg->process == NULL) { - nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", - recv_msg->stream, (int) recv_msg->pid); - } - - return recv_msg->process; -} - - static nxt_unit_mmap_buf_t * nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) { @@ -2398,12 +2359,11 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, mmap_buf->hdr = NULL; } - nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, (int) m.mmap_msg.chunk_id - (int) first_free_chunk); - nxt_unit_debug(req->ctx, "process %d allocated_chunks %d", - mmap_buf->process->pid, - (int) mmap_buf->process->outgoing.allocated_chunks); + nxt_unit_debug(req->ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); } else { if (nxt_slow_path(mmap_buf->plain_ptr == NULL @@ -2463,7 +2423,6 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) { if (mmap_buf->hdr != NULL) { nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, - mmap_buf->process, mmap_buf->hdr, mmap_buf->buf.start, mmap_buf->buf.end - mmap_buf->buf.start); @@ -2881,7 +2840,6 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) mmap_buf->buf.start = mmap_buf->free_ptr; mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->process = NULL; res = read(req->content_fd, mmap_buf->free_ptr, size); if (res < 0) { @@ -3184,28 +3142,19 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, uint32_t outgoing_size; nxt_unit_mmap_t *mm, *mm_end; nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - process = nxt_unit_port_process(port); - if (nxt_slow_path(process == NULL)) { - nxt_unit_alert(ctx, "mmap_get: port %d,%d already closed", - (int) port->id.pid, (int) port->id.id); - - return NULL; - } - - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); retry: - outgoing_size = process->outgoing.size; + outgoing_size = lib->outgoing.size; - mm_end = process->outgoing.elts + outgoing_size; + mm_end = lib->outgoing.elts + outgoing_size; - for (mm = process->outgoing.elts; mm < mm_end; mm++) { + for (mm = lib->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { @@ -3252,13 +3201,13 @@ retry: if (outgoing_size >= lib->shm_mmap_limit) { /* Cannot allocate more shared memory. */ - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); if (min_n == 0) { *n = 0; } - if (nxt_slow_path(process->outgoing.allocated_chunks + min_n + if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) { /* Memory allocated by application, but not send to router. */ @@ -3287,7 +3236,7 @@ retry: nxt_unit_debug(ctx, "oosm: retry"); - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); goto retry; } @@ -3297,13 +3246,12 @@ retry: unlock: - nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", - process->pid, - (int) process->outgoing.allocated_chunks); + nxt_unit_debug(ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); return hdr; } @@ -3448,20 +3396,11 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; - process = nxt_unit_port_process(port); - if (nxt_slow_path(process == NULL)) { - nxt_unit_alert(ctx, "new_mmap: port %d,%d already closed", - (int) port->id.pid, (int) port->id.id); - - return NULL; - } - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); + mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); if (nxt_slow_path(mm == NULL)) { nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); @@ -3538,9 +3477,9 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); - hdr->id = process->outgoing.size - 1; + hdr->id = lib->outgoing.size - 1; hdr->src_pid = lib->pid; - hdr->dst_pid = process->pid; + hdr->dst_pid = port->id.pid; hdr->sent_over = port->id.id; /* Mark first n chunk(s) as busy */ @@ -3552,7 +3491,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); rc = nxt_unit_send_mmap(ctx, port, fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -3561,12 +3500,12 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) } else { nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", - hdr->id, (int) lib->pid, (int) process->pid); + hdr->id, (int) lib->pid, (int) port->id.pid); } close(fd); - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); if (nxt_fast_path(hdr != NULL)) { return hdr; @@ -3574,7 +3513,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) remove_fail: - process->outgoing.size--; + lib->outgoing.size--; return NULL; } @@ -3662,7 +3601,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->process = nxt_unit_port_process(port); nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", mmap_buf->buf.start, (int) size); @@ -3692,7 +3630,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; - mmap_buf->process = nxt_unit_port_process(port); mmap_buf->free_ptr = NULL; mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); @@ -3713,7 +3650,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) struct stat mmap_stat; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; nxt_port_mmap_header_t *hdr; @@ -3722,60 +3658,47 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); - pthread_mutex_lock(&lib->mutex); - - process = nxt_unit_process_find(lib, pid, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(process == NULL)) { - nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", - (int) pid, fd); - - return NXT_UNIT_ERROR; - } - - rc = NXT_UNIT_ERROR; - if (fstat(fd, &mmap_stat) == -1) { - nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, - strerror(errno), errno); + nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, + strerror(errno), errno); - goto fail; + return NXT_UNIT_ERROR; } mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", - strerror(errno), errno); + nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", + strerror(errno), errno); - goto fail; + return NXT_UNIT_ERROR; } hdr = mem; if (nxt_slow_path(hdr->src_pid != pid)) { - nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " - "detected: %d != %d or %d != %d", (int) hdr->src_pid, - (int) pid, (int) hdr->dst_pid, (int) lib->pid); + nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " + "detected: %d != %d or %d != %d", (int) hdr->src_pid, + (int) pid, (int) hdr->dst_pid, (int) lib->pid); munmap(mem, PORT_MMAP_SIZE); - goto fail; + return NXT_UNIT_ERROR; } nxt_queue_init(&awaiting_rbuf); - pthread_mutex_lock(&process->incoming.mutex); + pthread_mutex_lock(&lib->incoming.mutex); - mm = nxt_unit_mmap_at(&process->incoming, hdr->id); + mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); if (nxt_slow_path(mm == NULL)) { - nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); + nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); munmap(mem, PORT_MMAP_SIZE); + rc = NXT_UNIT_ERROR; + } else { mm->hdr = hdr; @@ -3787,7 +3710,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) rc = NXT_UNIT_OK; } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&lib->incoming.mutex); nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { @@ -3803,10 +3726,6 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) } nxt_queue_loop; -fail: - - nxt_unit_process_release(process); - return rc; } @@ -3840,9 +3759,6 @@ nxt_unit_process_release(nxt_unit_process_t *process) if (c == 1) { nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); - nxt_unit_mmaps_destroy(&process->incoming); - nxt_unit_mmaps_destroy(&process->outgoing); - free(process); } } @@ -3873,7 +3789,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, { int res; nxt_chunk_id_t c; - nxt_unit_process_t *process; + nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; nxt_port_mmap_tracking_msg_t *tracking_msg; @@ -3889,14 +3805,11 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, recv_msg->start = tracking_msg + 1; recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); - process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(process == NULL)) { - return NXT_UNIT_ERROR; - } + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&process->incoming.mutex); + pthread_mutex_lock(&lib->incoming.mutex); - res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming, + res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming, recv_msg->pid, tracking_msg->mmap_id, &hdr, rbuf); @@ -3919,7 +3832,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, res = NXT_UNIT_OK; } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&lib->incoming.mutex); return res; } @@ -3979,8 +3892,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, int res; void *start; uint32_t size; + nxt_unit_impl_t *lib; nxt_unit_mmaps_t *mmaps; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; nxt_port_mmap_header_t *hdr; @@ -3992,11 +3905,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, return NXT_UNIT_ERROR; } - process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(process == NULL)) { - return NXT_UNIT_ERROR; - } - mmap_msg = recv_msg->start; end = nxt_pointer_to(recv_msg->start, recv_msg->size); @@ -4023,7 +3931,9 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, b = recv_msg->incoming_buf; mmap_msg = recv_msg->start; - mmaps = &process->incoming; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + mmaps = &lib->incoming; pthread_mutex_lock(&mmaps->mutex); @@ -4052,7 +3962,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, b->buf.free = start; b->buf.end = b->buf.start + size; b->hdr = hdr; - b->process = process; b = b->next; @@ -4105,8 +4014,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) static void -nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, +nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, void *start, uint32_t size) { int freed_chunks; @@ -4132,12 +4040,10 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (hdr->src_pid == lib->pid && freed_chunks != 0) { - nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, - -freed_chunks); + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", - process->pid, - (int) process->outgoing.allocated_chunks); + nxt_unit_debug(ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); } if (hdr->dst_pid == lib->pid @@ -4241,9 +4147,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) nxt_queue_init(&process->ports); - nxt_unit_mmaps_init(&process->incoming); - nxt_unit_mmaps_init(&process->outgoing); - lhq.replace = 0; lhq.value = process; @@ -4255,8 +4158,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) default: nxt_unit_alert(NULL, "process %d insert failed", (int) pid); - pthread_mutex_destroy(&process->outgoing.mutex); - pthread_mutex_destroy(&process->incoming.mutex); free(process); process = NULL; break; @@ -4907,17 +4808,6 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) } -nxt_inline nxt_unit_process_t * -nxt_unit_port_process(nxt_unit_port_t *port) -{ - nxt_unit_port_impl_t *port_impl; - - port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); - - return port_impl->process; -} - - static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { -- cgit From a1e9df2aef5a3917728c6fd37280b03020d51123 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:30 +0300 Subject: Port message extended to transfer 2 file descriptors. --- src/nxt_unit.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 154fd480..66aadd98 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -192,6 +192,7 @@ struct nxt_unit_recv_msg_s { uint32_t size; int fd; + int fd2; nxt_unit_mmap_buf_t *incoming_buf; }; @@ -805,14 +806,20 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = NXT_UNIT_ERROR; recv_msg.fd = -1; + recv_msg.fd2 = -1; port_msg = (nxt_port_msg_t *) rbuf->buf; cm = (struct cmsghdr *) rbuf->oob; - if (cm->cmsg_len == CMSG_LEN(sizeof(int)) - && cm->cmsg_level == SOL_SOCKET + if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { - memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); + if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { + memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); + } + + if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { + memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); + } } recv_msg.incoming_buf = NULL; @@ -852,6 +859,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (rc == NXT_UNIT_AGAIN) { recv_msg.fd = -1; + recv_msg.fd2 = -1; } goto fail; @@ -871,6 +879,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (rc == NXT_UNIT_AGAIN) { recv_msg.fd = -1; + recv_msg.fd2 = -1; } goto fail; @@ -960,6 +969,10 @@ fail: close(recv_msg.fd); } + if (recv_msg.fd2 != -1) { + close(recv_msg.fd2); + } + while (recv_msg.incoming_buf != NULL) { nxt_unit_mmap_buf_free(recv_msg.incoming_buf); } -- cgit From e227fc9e6281c280c46139a81646ecd7b0510e2b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:34 +0300 Subject: Introducing application and port shared memory queues. The goal is to minimize the number of syscalls needed to deliver a message. --- src/nxt_unit.c | 1175 ++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 923 insertions(+), 252 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 66aadd98..1008a9d6 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -7,6 +7,8 @@ #include "nxt_main.h" #include "nxt_port_memory_int.h" +#include "nxt_port_queue.h" +#include "nxt_app_queue.h" #include "nxt_unit.h" #include "nxt_unit_request.h" @@ -50,12 +52,15 @@ nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); -static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); +static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, + int queue_fd); static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_port_id_t *port_id); static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); @@ -92,6 +97,7 @@ static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n); +static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, @@ -103,8 +109,6 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); @@ -124,18 +128,22 @@ static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); +nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, - nxt_unit_port_t *port); + nxt_unit_port_t *port, int queue_fd); nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, - nxt_unit_port_t *port); + nxt_unit_port_t *port, void *queue); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, @@ -150,18 +158,28 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, + nxt_unit_read_buf_t *src); +static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); -static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl); -static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( - nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); +static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req); +static nxt_unit_request_info_t *nxt_unit_request_hash_find( + nxt_unit_ctx_t *ctx, uint32_t stream, int remove); static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); @@ -217,6 +235,7 @@ struct nxt_unit_request_info_impl_s { nxt_unit_req_state_t state; uint8_t websocket; + uint8_t in_hash; /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; @@ -349,6 +368,11 @@ struct nxt_unit_port_impl_s { nxt_queue_t awaiting_req; int ready; + + void *queue; + + int from_socket; + nxt_unit_read_buf_t *socket_rbuf; }; @@ -375,7 +399,8 @@ typedef struct { nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { - int rc; + int rc, queue_fd; + void *mem; uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; @@ -386,6 +411,8 @@ nxt_unit_init(nxt_unit_init_t *init) return NULL; } + queue_fd = -1; + if (init->ready_port.id.pid != 0 && init->ready_stream != 0 && init->read_port.id.pid != 0) @@ -422,33 +449,58 @@ nxt_unit_init(nxt_unit_init_t *init) ctx = &lib->main_ctx.ctx; - lib->router_port = nxt_unit_add_port(ctx, &router_port); + lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); if (nxt_slow_path(lib->router_port == NULL)) { nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port); + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { + goto fail; + } + + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); + + goto fail; + } + + nxt_port_queue_init(mem); + + lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { nxt_unit_alert(NULL, "failed to add read_port"); + munmap(mem, sizeof(nxt_port_queue_t)); + goto fail; } - rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); + munmap(mem, sizeof(nxt_port_queue_t)); + goto fail; } close(ready_port.out_fd); + close(queue_fd); return ctx; fail: + if (queue_fd != -1) { + close(queue_fd); + } + nxt_unit_ctx_release(&lib->main_ctx.ctx); return NULL; @@ -497,6 +549,7 @@ nxt_unit_create(nxt_unit_init_t *init) rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { + pthread_mutex_destroy(&lib->mutex); goto fail; } @@ -505,6 +558,7 @@ nxt_unit_create(nxt_unit_init_t *init) if (cb->request_handler == NULL) { nxt_unit_alert(NULL, "request_handler is NULL"); + pthread_mutex_destroy(&lib->mutex); goto fail; } @@ -765,12 +819,17 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, static int -nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) +nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) { ssize_t res; nxt_port_msg_t msg; nxt_unit_impl_t *lib; + union { + struct cmsghdr cm; + char space[CMSG_SPACE(sizeof(int))]; + } cmsg; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); msg.stream = stream; @@ -783,7 +842,25 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); + memset(&cmsg, 0, sizeof(cmsg)); + + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_level = SOL_SOCKET; + cmsg.cm.cmsg_type = SCM_RIGHTS; + + /* + * memcpy() is used instead of simple + * *(int *) CMSG_DATA(&cmsg.cm) = fd; + * because GCC 4.4 with -O2/3/s optimization may issue a warning: + * dereferencing type-punned pointer will break strict-aliasing rules + * + * Fortunately, GCC with -O1 compiles this nxt_memcpy() + * in the same simple assignment as in the code above. + */ + memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); + + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -838,6 +915,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) goto fail; } + nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d", + port_msg->stream, (int) port_msg->type, + recv_msg.fd, recv_msg.fd2); + recv_msg.stream = port_msg->stream; recv_msg.pid = port_msg->pid; recv_msg.reply_port = port_msg->reply_port; @@ -853,19 +934,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) goto fail; } - if (port_msg->tracking) { - rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); - - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - if (rc == NXT_UNIT_AGAIN) { - recv_msg.fd = -1; - recv_msg.fd2 = -1; - } - - goto fail; - } - } - /* Fragmentation is unsupported. */ if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", @@ -929,6 +997,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = nxt_unit_process_req_headers(ctx, &recv_msg); break; + case _NXT_PORT_MSG_REQ_BODY: + rc = nxt_unit_process_req_body(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_WEBSOCKET: rc = nxt_unit_process_websocket(ctx, &recv_msg); break; @@ -992,6 +1064,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; + void *mem; nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -1013,9 +1086,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port_msg = recv_msg->start; - nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", + nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d", recv_msg->stream, (int) new_port_msg->pid, - (int) new_port_msg->id, recv_msg->fd); + (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -1025,6 +1098,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port.in_fd = recv_msg->fd; new_port.out_fd = -1; + mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd2, 0); + } else { nb = 0; @@ -1041,14 +1117,23 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port.in_fd = -1; new_port.out_fd = recv_msg->fd; + + mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd2, 0); } + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2, + strerror(errno), errno); + + return NXT_UNIT_ERROR; + } new_port.data = NULL; recv_msg->fd = -1; - port = nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port, mem); if (nxt_slow_path(port == NULL)) { return NXT_UNIT_ERROR; } @@ -1134,6 +1219,7 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req->response_max_fields = 0; req_impl->state = NXT_UNIT_RS_START; req_impl->websocket = 0; + req_impl->in_hash = 0; nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, (int) r->method_length, @@ -1151,12 +1237,82 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (nxt_fast_path(res == NXT_UNIT_OK)) { res = nxt_unit_send_req_headers_ack(req); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return res; + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + return NXT_UNIT_OK; + } + } + + lib->callbacks.request_handler(req); + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + uint64_t l; + nxt_unit_impl_t *lib; + nxt_unit_mmap_buf_t *b; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (req == NULL) { + return NXT_UNIT_OK; + } + + l = req->content_buf->end - req->content_buf->free; + + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; + l += b->buf.end - b->buf.free; + } + + if (recv_msg->incoming_buf != NULL) { + b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); + + /* "Move" incoming buffer list to req_impl. */ + nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); + recv_msg->incoming_buf = NULL; + } + + req->content_fd = recv_msg->fd; + recv_msg->fd = -1; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->callbacks.data_handler != NULL) { + lib->callbacks.data_handler(req); + + return NXT_UNIT_OK; + } + + if (req->content_fd != -1 || l == req->content_length) { lib->callbacks.request_handler(req); } @@ -1260,6 +1416,9 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_queue_insert_tail(&process->ports, &port_impl->link); port_impl->process = process; + port_impl->queue = NULL; + port_impl->from_socket = 0; + port_impl->socket_rbuf = NULL; nxt_queue_init(&port_impl->awaiting_req); @@ -1321,21 +1480,17 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) size_t hsize; nxt_unit_impl_t *lib; nxt_unit_mmap_buf_t *b; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_callbacks_t *cb; nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - - req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, - recv_msg->last); - if (req_impl == NULL) { + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (nxt_slow_path(req == NULL)) { return NXT_UNIT_OK; } - req = &req_impl->req; + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); cb = &lib->callbacks; @@ -1501,12 +1656,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - if (req_impl->websocket) { - nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); - - req_impl->websocket = 0; + if (req_impl->in_hash) { + nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); } + req_impl->websocket = 0; + while (req_impl->outgoing_buf != NULL) { nxt_unit_mmap_buf_free(req_impl->outgoing_buf); } @@ -2170,7 +2325,6 @@ int nxt_unit_response_upgrade(nxt_unit_request_info_t *req) { int rc; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); @@ -2193,9 +2347,7 @@ nxt_unit_response_upgrade(nxt_unit_request_info_t *req) return NXT_UNIT_ERROR; } - ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); - - rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); + rc = nxt_unit_request_hash_add(req->ctx, req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); @@ -2466,6 +2618,8 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + return rbuf; } @@ -2564,6 +2718,8 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, nxt_unit_request_info_impl_t *req_impl; char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + nxt_unit_req_debug(req, "write: %d", (int) size); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; @@ -2743,9 +2899,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, dst, size); + nxt_unit_req_debug(req, "read: %d", (int) buf_res); + if (buf_res < (ssize_t) size && req->content_fd != -1) { res = read(req->content_fd, dst, size); - if (res < 0) { + if (nxt_slow_path(res < 0)) { nxt_unit_req_alert(req, "failed to read content: %s (%d)", strerror(errno), errno); @@ -3301,7 +3459,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) { - nxt_port_msg_t *port_msg; + int res; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -3313,21 +3471,15 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - - nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); - - if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_ERROR) { nxt_unit_read_buf_release(ctx, rbuf); return NXT_UNIT_ERROR; } - port_msg = (nxt_port_msg_t *) rbuf->buf; - - if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { + if (nxt_unit_is_shm_ack(rbuf)) { nxt_unit_read_buf_release(ctx, rbuf); - break; } @@ -3337,7 +3489,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); - if (port_msg->type == _NXT_PORT_MSG_QUIT) { + if (nxt_unit_is_quit(rbuf)) { nxt_unit_debug(ctx, "oosm: quit received"); return NXT_UNIT_ERROR; @@ -3406,7 +3558,6 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) { int i, fd, rc; void *mem; - char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; @@ -3420,59 +3571,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) return NULL; } - snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", - lib->pid, (void *) pthread_self()); - -#if (NXT_HAVE_MEMFD_CREATE) - - fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); - if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, - strerror(errno), errno); - - goto remove_fail; - } - - nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); - -#elif (NXT_HAVE_SHM_OPEN_ANON) - - fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); - if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", - strerror(errno), errno); - - goto remove_fail; - } - -#elif (NXT_HAVE_SHM_OPEN) - - /* Just in case. */ - shm_unlink(name); - - fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); + fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, - strerror(errno), errno); - - goto remove_fail; - } - - if (nxt_slow_path(shm_unlink(name) == -1)) { - nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, - strerror(errno), errno); - } - -#else - -#error No working shared memory implementation. - -#endif - - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { - nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, - strerror(errno), errno); - goto remove_fail; } @@ -3481,6 +3581,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, strerror(errno), errno); + close(fd); + goto remove_fail; } @@ -3532,6 +3634,80 @@ remove_fail: } +static int +nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) +{ + int fd; + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + +#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) + char name[64]; + + snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", + lib->pid, (void *) pthread_self()); +#endif + +#if (NXT_HAVE_MEMFD_CREATE) + + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, + strerror(errno), errno); + + return -1; + } + + nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); + +#elif (NXT_HAVE_SHM_OPEN_ANON) + + fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", + strerror(errno), errno); + + return -1; + } + +#elif (NXT_HAVE_SHM_OPEN) + + /* Just in case. */ + shm_unlink(name); + + fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, + strerror(errno), errno); + + return -1; + } + + if (nxt_slow_path(shm_unlink(name) == -1)) { + nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, + strerror(errno), errno); + } + +#else + +#error No working shared memory implementation. + +#endif + + if (nxt_slow_path(ftruncate(fd, size) == -1)) { + nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, + strerror(errno), errno); + + close(fd); + + return -1; + } + + return fd; +} + + static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { @@ -3797,63 +3973,8 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, - nxt_unit_read_buf_t *rbuf) -{ - int res; - nxt_chunk_id_t c; - nxt_unit_impl_t *lib; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_tracking_msg_t *tracking_msg; - - if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", - recv_msg->stream, (int) recv_msg->size); - - return NXT_UNIT_ERROR; - } - - tracking_msg = recv_msg->start; - - recv_msg->start = tracking_msg + 1; - recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->incoming.mutex); - - res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming, - recv_msg->pid, tracking_msg->mmap_id, - &hdr, rbuf); - - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return res; - } - - c = tracking_msg->tracking_id; - res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); - - if (res == 0) { - nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", - recv_msg->stream); - - nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); - - res = NXT_UNIT_CANCELLED; - - } else { - res = NXT_UNIT_OK; - } - - pthread_mutex_unlock(&lib->incoming.mutex); - - return res; -} - - -static int -nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, - pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, +nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, + pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf) { int res, need_rbuf; @@ -4154,7 +4275,7 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) } process->pid = pid; - process->use_count = 1; + process->use_count = 2; process->next_port_id = 0; process->lib = lib; @@ -4176,8 +4297,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) break; } - nxt_unit_process_use(process); - return process; } @@ -4293,22 +4412,52 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { - int res, err; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - struct pollfd fds[2]; + int nevents, res, err; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + struct pollfd fds[2]; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { - return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + + return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } + port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, + port); + retry: + if (port_impl->from_socket == 0) { + res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + port_impl->from_socket); + + } else { + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + } + } + + res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } + fds[0].fd = ctx_impl->read_port->in_fd; fds[0].events = POLLIN; fds[0].revents = 0; @@ -4317,31 +4466,47 @@ retry: fds[1].events = POLLIN; fds[1].revents = 0; - res = poll(fds, 2, -1); - if (nxt_slow_path(res < 0)) { + nevents = poll(fds, 2, -1); + if (nxt_slow_path(nevents == -1)) { err = errno; if (err == EINTR) { goto retry; } - nxt_unit_alert(ctx, "poll() failed: %s (%d)", - strerror(err), err); + nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", + fds[0].fd, fds[1].fd, strerror(err), err); rbuf->size = -1; return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } + nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); + if ((fds[0].revents & POLLIN) != 0) { - return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } + + return res; } if ((fds[1].revents & POLLIN) != 0) { - return nxt_unit_port_recv(ctx, lib->shared_port, rbuf); + res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } + + return res; } - rbuf->size = -1; + nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); return NXT_UNIT_ERROR; } @@ -4392,9 +4557,11 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { + int res; nxt_queue_t ready_req; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; nxt_queue_init(&ready_req); @@ -4419,7 +4586,35 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); - (void) nxt_unit_send_req_headers_ack(&req_impl->req); + req = &req_impl->req; + + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + continue; + } + } lib->callbacks.request_handler(&req_impl->req); @@ -4432,6 +4627,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { int rc; nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); @@ -4442,11 +4638,30 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } + + retry: + rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + rc = nxt_unit_process_pending_rbuf(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } + + nxt_unit_process_ready_req(ctx); } nxt_unit_ctx_release(ctx); @@ -4455,11 +4670,68 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) } +nxt_inline int +nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) +{ + if (nxt_fast_path(rbuf->size == 1)) { + return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_SHM_ACK; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_QUIT; + } + + return 0; +} + + int nxt_unit_run_shared(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_use(ctx); @@ -4467,11 +4739,35 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } + + retry: + rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_read_buf_release(ctx, rbuf); + break; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + rc = nxt_unit_process_pending_rbuf(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } + + nxt_unit_process_ready_req(ctx); } nxt_unit_ctx_release(ctx); @@ -4499,6 +4795,7 @@ static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; + nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; rbuf = nxt_unit_read_buf_get(ctx); @@ -4506,10 +4803,18 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) return NXT_UNIT_ERROR; } - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - rc = nxt_unit_port_recv(ctx, port, rbuf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { +retry: + + if (port == lib->shared_port) { + rc = nxt_unit_shared_port_recv(ctx, port, rbuf); + + } else { + rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); + } + + if (rc != NXT_UNIT_OK) { nxt_unit_read_buf_release(ctx, rbuf); return rc; } @@ -4526,6 +4831,15 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_process_ready_req(ctx); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + if (lib->online) { + goto retry; + } + return rc; } @@ -4540,10 +4854,12 @@ nxt_unit_done(nxt_unit_ctx_t *ctx) nxt_unit_ctx_t * nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_port_t *port; - nxt_unit_ctx_impl_t *new_ctx; + int rc, queue_fd; + void *mem; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *new_ctx; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4554,33 +4870,57 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) return NULL; } + rc = nxt_unit_ctx_init(lib, new_ctx, data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + free(new_ctx); + + return NULL; + } + + queue_fd = -1; + port = nxt_unit_create_port(ctx); if (nxt_slow_path(port == NULL)) { - free(new_ctx); + goto fail; + } - return NULL; + new_ctx->read_port = port; + + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { + goto fail; } - rc = nxt_unit_send_port(ctx, lib->router_port, port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); + goto fail; } - rc = nxt_unit_ctx_init(lib, new_ctx, data); + nxt_port_queue_init(mem); + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + port_impl->queue = mem; + + rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } - new_ctx->read_port = port; + close(queue_fd); return &new_ctx->ctx; fail: - nxt_unit_remove_port(lib, &port->id); - nxt_unit_port_release(port); + if (queue_fd != -1) { + close(queue_fd); + } - free(new_ctx); + nxt_unit_ctx_release(&new_ctx->ctx); return NULL; } @@ -4633,6 +4973,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) nxt_queue_remove(&ctx_impl->link); if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_remove_port(lib, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); } @@ -4709,10 +5050,8 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) nxt_unit_process_release(process); - port = nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port, NULL); if (nxt_slow_path(port == NULL)) { - nxt_unit_alert(ctx, "create_port: add_port() failed"); - close(port_sockets[0]); close(port_sockets[1]); } @@ -4723,10 +5062,11 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, - nxt_unit_port_t *port) + nxt_unit_port_t *port, int queue_fd) { ssize_t res; nxt_unit_impl_t *lib; + int fds[2] = { port->out_fd, queue_fd }; struct { nxt_port_msg_t msg; @@ -4735,7 +5075,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, union { struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; + char space[CMSG_SPACE(sizeof(int) * 2)]; } cmsg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4758,7 +5098,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, memset(&cmsg, 0, sizeof(cmsg)); - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; @@ -4771,7 +5111,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, * Fortunately, GCC with -O1 compiles this nxt_memcpy() * in the same simple assignment as in the code above. */ - memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int)); + memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); @@ -4799,7 +5139,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) c = nxt_atomic_fetch_add(&port_impl->use_count, -1); if (c == 1) { - nxt_unit_debug(NULL, "destroy port %d,%d", + nxt_unit_debug(NULL, "destroy port{%d,%d}", (int) port->id.pid, (int) port->id.id); nxt_unit_process_release(port_impl->process); @@ -4816,13 +5156,31 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) port->out_fd = -1; } + if (port->in_fd != -1) { + close(port->in_fd); + + port->in_fd = -1; + } + + if (port->out_fd != -1) { + close(port->out_fd); + + port->out_fd = -1; + } + + if (port_impl->queue != NULL) { + munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1) + ? sizeof(nxt_app_queue_t) + : sizeof(nxt_port_queue_t)); + } + free(port_impl); } } static nxt_unit_port_t * -nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) { int rc; nxt_queue_t awaiting_req; @@ -4840,9 +5198,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); if (nxt_slow_path(old_port != NULL)) { - nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", - port->id.pid, port->id.id, - port->in_fd, port->out_fd); + nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " + "in_fd %d out_fd %d queue %p", + port->id.pid, port->id.id, + port->in_fd, port->out_fd, queue); if (old_port->data == NULL) { old_port->data = port->data; @@ -4875,6 +5234,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + if (old_port_impl->queue == NULL) { + old_port_impl->queue = queue; + } + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); nxt_queue_init(&old_port_impl->awaiting_req); @@ -4914,9 +5277,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = NULL; - nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", + nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", port->id.pid, port->id.id, - port->in_fd, port->out_fd); + port->in_fd, port->out_fd, queue); process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { @@ -4929,6 +5292,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = malloc(sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(new_port == NULL)) { + nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", + port->id.pid, port->id.id); + goto unlock; } @@ -4951,6 +5317,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port->use_count = 2; new_port->process = process; new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + new_port->queue = queue; + new_port->from_socket = 0; + new_port->socket_rbuf = NULL; nxt_queue_init(&new_port->awaiting_req); @@ -5010,13 +5379,13 @@ nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { - nxt_unit_debug(NULL, "remove_port: port %d,%d not found", + nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", (int) port_id->pid, (int) port_id->id); return NULL; } - nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", + nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, port->in_fd, port->out_fd, port->data); @@ -5089,10 +5458,12 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - lib->online = 0; + if (lib->online) { + lib->online = 0; - if (lib->callbacks.quit != NULL) { - lib->callbacks.quit(ctx); + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); + } } } @@ -5137,20 +5508,91 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { - nxt_unit_impl_t *lib; - - nxt_unit_debug(ctx, "port_send: port %d,%d fd %d", - (int) port->id.pid, (int) port->id.id, port->out_fd); + int notify; + ssize_t ret; + nxt_int_t rc; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + if (port_impl->queue != NULL && oob_size == 0 + && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) + { + rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); + + return -1; + } + + nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", + (int) port->id.pid, (int) port->id.id, + (int) buf_size, notify); + + if (notify) { + memcpy(&msg, buf, sizeof(nxt_port_msg_t)); + + msg.type = _NXT_PORT_MSG_READ_QUEUE; + + if (lib->callbacks.port_send == NULL) { + ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = lib->callbacks.port_send(ctx, port, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + } + + } + + return buf_size; + } + + if (port_impl->queue != NULL) { + msg.type = _NXT_PORT_MSG_READ_SOCKET; + + rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); + + return -1; + } + + nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", + (int) port->id.pid, (int) port->id.id, notify); + } + if (lib->callbacks.port_send != NULL) { - return lib->callbacks.port_send(ctx, port, buf, buf_size, - oob, oob_size); + ret = lib->callbacks.port_send(ctx, port, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); } - return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, - oob, oob_size); + return ret; } @@ -5158,6 +5600,7 @@ static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { + int err; ssize_t res; struct iovec iov[1]; struct msghdr msg; @@ -5178,7 +5621,9 @@ retry: res = sendmsg(fd, &msg, 0); if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + err = errno; + + if (err == EINTR) { goto retry; } @@ -5187,7 +5632,7 @@ retry: * implementation. */ nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", - fd, (int) buf_size, strerror(errno), errno); + fd, (int) buf_size, strerror(err), err); } else { nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, @@ -5198,6 +5643,158 @@ retry: } +static int +nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) +{ + int res, read; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + read = 0; + +retry: + + if (port_impl->from_socket > 0) { + if (port_impl->socket_rbuf != NULL + && port_impl->socket_rbuf->size > 0) + { + port_impl->from_socket--; + + nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); + port_impl->socket_rbuf->size = 0; + + nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + + } else { + res = nxt_unit_port_queue_recv(port, rbuf); + + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) port->id.pid, (int) port->id.id, + port_impl->from_socket); + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + } + + if (read) { + return NXT_UNIT_AGAIN; + } + + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + read = 1; + + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + if (port_impl->from_socket) { + nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET"); + } + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (res == NXT_UNIT_AGAIN) { + return NXT_UNIT_AGAIN; + } + + if (port_impl->from_socket > 0) { + port_impl->from_socket--; + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (port_impl->socket_rbuf == NULL) { + port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); + + if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + port_impl->socket_rbuf->size = 0; + } + + if (port_impl->socket_rbuf->size > 0) { + nxt_unit_alert(ctx, "too many port socket messages"); + + return NXT_UNIT_ERROR; + } + + nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + goto retry; +} + + +nxt_inline void +nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) +{ + memcpy(dst->buf, src->buf, src->size); + dst->size = src->size; + memcpy(dst->oob, src->oob, sizeof(src->oob)); +} + + +static int +nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) +{ + int res; + +retry: + + res = nxt_unit_app_queue_recv(port, rbuf); + + if (res == NXT_UNIT_AGAIN) { + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + goto retry; + } + } + + return res; +} + + static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) @@ -5214,6 +5811,9 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, rbuf->buf, sizeof(rbuf->buf), rbuf->oob, sizeof(rbuf->oob)); + nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + if (nxt_slow_path(rbuf->size < 0)) { return NXT_UNIT_ERROR; } @@ -5247,13 +5847,13 @@ retry: if (err == EAGAIN) { nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", - fd, strerror(errno), errno); + fd, strerror(err), err); return NXT_UNIT_AGAIN; } nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", - fd, strerror(errno), errno); + fd, strerror(err), err); return NXT_UNIT_ERROR; } @@ -5264,6 +5864,52 @@ retry: } +static int +nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + +static int +nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + uint32_t cookie; + nxt_port_msg_t *port_msg; + nxt_app_queue_t *queue; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + queue = port_impl->queue; + +retry: + + rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); + + nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); + + if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { + return NXT_UNIT_OK; + } + + nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); + + goto retry; + } + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + static nxt_int_t nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -5392,12 +6038,19 @@ static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { static int -nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl) +nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req) { - uint32_t *stream; - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + uint32_t *stream; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + if (req_impl->in_hash) { + return NXT_UNIT_OK; + } stream = &req_impl->stream; @@ -5409,11 +6062,18 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, lhq.replace = 0; lhq.value = req_impl; - res = nxt_lvlhsh_insert(request_hash, &lhq); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); + + pthread_mutex_unlock(&ctx_impl->mutex); switch (res) { case NXT_OK: + req_impl->in_hash = 1; return NXT_UNIT_OK; default: @@ -5422,12 +6082,13 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, } -static nxt_unit_request_info_impl_t * -nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, - int remove) +static nxt_unit_request_info_t * +nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) { - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); lhq.key.length = sizeof(stream); @@ -5435,16 +6096,26 @@ nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, lhq.proto = &lvlhsh_requests_proto; lhq.pool = NULL; + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + if (remove) { - res = nxt_lvlhsh_delete(request_hash, &lhq); + res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); } else { - res = nxt_lvlhsh_find(request_hash, &lhq); + res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); } + pthread_mutex_unlock(&ctx_impl->mutex); + switch (res) { case NXT_OK: + req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, + req); + req_impl->in_hash = 0; + return lhq.value; default: -- cgit From 8cf522bf2d8c2d3bcef88ab86c93e06c6afcb6ae Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:36 +0300 Subject: Wrapping close() call in libunit for logging. --- src/nxt_unit.c | 79 ++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 44 insertions(+), 35 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 1008a9d6..990c789c 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -170,6 +170,7 @@ static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_close(int fd); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); @@ -490,15 +491,15 @@ nxt_unit_init(nxt_unit_init_t *init) goto fail; } - close(ready_port.out_fd); - close(queue_fd); + nxt_unit_close(ready_port.out_fd); + nxt_unit_close(queue_fd); return ctx; fail: if (queue_fd != -1) { - close(queue_fd); + nxt_unit_close(queue_fd); } nxt_unit_ctx_release(&lib->main_ctx.ctx); @@ -1038,11 +1039,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) fail: if (recv_msg.fd != -1) { - close(recv_msg.fd); + nxt_unit_close(recv_msg.fd); } if (recv_msg.fd2 != -1) { - close(recv_msg.fd2); + nxt_unit_close(recv_msg.fd2); } while (recv_msg.incoming_buf != NULL) { @@ -1671,7 +1672,7 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) } if (req->content_fd != -1) { - close(req->content_fd); + nxt_unit_close(req->content_fd); req->content_fd = -1; } @@ -2911,7 +2912,7 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) } if (res < (ssize_t) size) { - close(req->content_fd); + nxt_unit_close(req->content_fd); req->content_fd = -1; } @@ -3023,7 +3024,7 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) } if (res < (ssize_t) size) { - close(req->content_fd); + nxt_unit_close(req->content_fd); req->content_fd = -1; } @@ -3581,7 +3582,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, strerror(errno), errno); - close(fd); + nxt_unit_close(fd); goto remove_fail; } @@ -3618,7 +3619,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) hdr->id, (int) lib->pid, (int) port->id.pid); } - close(fd); + nxt_unit_close(fd); pthread_mutex_lock(&lib->outgoing.mutex); @@ -3699,7 +3700,7 @@ nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, strerror(errno), errno); - close(fd); + nxt_unit_close(fd); return -1; } @@ -4910,14 +4911,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) goto fail; } - close(queue_fd); + nxt_unit_close(queue_fd); return &new_ctx->ctx; fail: if (queue_fd != -1) { - close(queue_fd); + nxt_unit_close(queue_fd); } nxt_unit_ctx_release(&new_ctx->ctx); @@ -5034,8 +5035,8 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) if (nxt_slow_path(process == NULL)) { pthread_mutex_unlock(&lib->mutex); - close(port_sockets[0]); - close(port_sockets[1]); + nxt_unit_close(port_sockets[0]); + nxt_unit_close(port_sockets[1]); return NULL; } @@ -5052,8 +5053,8 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) port = nxt_unit_add_port(ctx, &new_port, NULL); if (nxt_slow_path(port == NULL)) { - close(port_sockets[0]); - close(port_sockets[1]); + nxt_unit_close(port_sockets[0]); + nxt_unit_close(port_sockets[1]); } return port; @@ -5139,31 +5140,20 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) c = nxt_atomic_fetch_add(&port_impl->use_count, -1); if (c == 1) { - nxt_unit_debug(NULL, "destroy port{%d,%d}", - (int) port->id.pid, (int) port->id.id); + nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d", + (int) port->id.pid, (int) port->id.id, + port->in_fd, port->out_fd); nxt_unit_process_release(port_impl->process); if (port->in_fd != -1) { - close(port->in_fd); + nxt_unit_close(port->in_fd); port->in_fd = -1; } if (port->out_fd != -1) { - close(port->out_fd); - - port->out_fd = -1; - } - - if (port->in_fd != -1) { - close(port->in_fd); - - port->in_fd = -1; - } - - if (port->out_fd != -1) { - close(port->out_fd); + nxt_unit_close(port->out_fd); port->out_fd = -1; } @@ -5214,7 +5204,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) } if (port->in_fd != -1) { - close(port->in_fd); + nxt_unit_close(port->in_fd); port->in_fd = -1; } @@ -5224,7 +5214,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) } if (port->out_fd != -1) { - close(port->out_fd); + nxt_unit_close(port->out_fd); port->out_fd = -1; } @@ -5910,6 +5900,25 @@ retry: } +nxt_inline int +nxt_unit_close(int fd) +{ + int res; + + res = close(fd); + + if (nxt_slow_path(res == -1)) { + nxt_unit_alert(NULL, "close(%d) failed: %s (%d)", + fd, strerror(errno), errno); + + } else { + nxt_unit_debug(NULL, "close(%d): %d", fd, res); + } + + return res; +} + + static nxt_int_t nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { -- cgit From acb0cca49def92563d9b221d818b541b60e30eaa Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 21:48:16 +0300 Subject: Moving file descriptor blocking to libunit. The default libunit behavior relies on blocking the recv() call for port file descriptors, which an application may override if needed. For external applications, port file descriptors were toggled to blocking mode before the exec() call. If the exec() call failed, descriptor remained blocked, so the process hanged while trying to read from it. This patch moves file descriptor mode switch inside libunit. --- src/nxt_unit.c | 50 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 11 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 990c789c..b063058f 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -171,6 +171,7 @@ static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); nxt_inline int nxt_unit_close(int fd); +static int nxt_unit_fd_blocking(int fd); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); @@ -413,6 +414,7 @@ nxt_unit_init(nxt_unit_init_t *init) } queue_fd = -1; + mem = MAP_FAILED; if (init->ready_port.id.pid != 0 && init->ready_stream != 0 @@ -450,6 +452,11 @@ nxt_unit_init(nxt_unit_init_t *init) ctx = &lib->main_ctx.ctx; + rc = nxt_unit_fd_blocking(router_port.out_fd); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); if (nxt_slow_path(lib->router_port == NULL)) { nxt_unit_alert(NULL, "failed to add router_port"); @@ -473,12 +480,20 @@ nxt_unit_init(nxt_unit_init_t *init) nxt_port_queue_init(mem); + rc = nxt_unit_fd_blocking(read_port.in_fd); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { nxt_unit_alert(NULL, "failed to add read_port"); - munmap(mem, sizeof(nxt_port_queue_t)); + goto fail; + } + rc = nxt_unit_fd_blocking(ready_port.out_fd); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } @@ -486,8 +501,6 @@ nxt_unit_init(nxt_unit_init_t *init) if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); - munmap(mem, sizeof(nxt_port_queue_t)); - goto fail; } @@ -498,6 +511,10 @@ nxt_unit_init(nxt_unit_init_t *init) fail: + if (mem != MAP_FAILED) { + munmap(mem, sizeof(nxt_port_queue_t)); + } + if (queue_fd != -1) { nxt_unit_close(queue_fd); } @@ -1064,7 +1081,6 @@ fail: static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { - int nb; void *mem; nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; @@ -1103,13 +1119,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) MAP_SHARED, recv_msg->fd2, 0); } else { - nb = 0; - - if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { - nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " - "failed: %s (%d)", - recv_msg->stream, recv_msg->fd, strerror(errno), errno); - + if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd) != NXT_UNIT_OK)) { return NXT_UNIT_ERROR; } @@ -5919,6 +5929,24 @@ nxt_unit_close(int fd) } +static int +nxt_unit_fd_blocking(int fd) +{ + int nb; + + nb = 0; + + if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", + fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + static nxt_int_t nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { -- cgit From f147943f6382c0e90a216615ff9bcf57a3db8c75 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 21:48:27 +0300 Subject: Style fixes for 2 file descriptors transfer over port. Two consecutive fd and fd2 fields replaced with array. --- src/nxt_unit.c | 71 +++++++++++++++++++++++++++++----------------------------- 1 file changed, 36 insertions(+), 35 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index b063058f..8dd03b82 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -211,8 +211,7 @@ struct nxt_unit_recv_msg_s { void *start; uint32_t size; - int fd; - int fd2; + int fd[2]; nxt_unit_mmap_buf_t *incoming_buf; }; @@ -900,8 +899,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_ERROR; - recv_msg.fd = -1; - recv_msg.fd2 = -1; + recv_msg.fd[0] = -1; + recv_msg.fd[1] = -1; port_msg = (nxt_port_msg_t *) rbuf->buf; cm = (struct cmsghdr *) rbuf->oob; @@ -909,11 +908,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) && cm->cmsg_type == SCM_RIGHTS) { if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { - memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); + memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); } if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { - memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); + memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); } } @@ -933,9 +932,9 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) goto fail; } - nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d", + nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", port_msg->stream, (int) port_msg->type, - recv_msg.fd, recv_msg.fd2); + recv_msg.fd[0], recv_msg.fd[1]); recv_msg.stream = port_msg->stream; recv_msg.pid = port_msg->pid; @@ -964,8 +963,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (rc == NXT_UNIT_AGAIN) { - recv_msg.fd = -1; - recv_msg.fd2 = -1; + recv_msg.fd[0] = -1; + recv_msg.fd[1] = -1; } goto fail; @@ -987,11 +986,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) case _NXT_PORT_MSG_CHANGE_FILE: nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", - port_msg->stream, recv_msg.fd); + port_msg->stream, recv_msg.fd[0]); - if (dup2(recv_msg.fd, lib->log_fd) == -1) { + if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", - port_msg->stream, recv_msg.fd, lib->log_fd, + port_msg->stream, recv_msg.fd[0], lib->log_fd, strerror(errno), errno); goto fail; @@ -1001,14 +1000,14 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) break; case _NXT_PORT_MSG_MMAP: - if (nxt_slow_path(recv_msg.fd < 0)) { + if (nxt_slow_path(recv_msg.fd[0] < 0)) { nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", - port_msg->stream, recv_msg.fd); + port_msg->stream, recv_msg.fd[0]); goto fail; } - rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); + rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); break; case _NXT_PORT_MSG_REQ_HEADERS: @@ -1055,12 +1054,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) fail: - if (recv_msg.fd != -1) { - nxt_unit_close(recv_msg.fd); + if (recv_msg.fd[0] != -1) { + nxt_unit_close(recv_msg.fd[0]); } - if (recv_msg.fd2 != -1) { - nxt_unit_close(recv_msg.fd2); + if (recv_msg.fd[1] != -1) { + nxt_unit_close(recv_msg.fd[1]); } while (recv_msg.incoming_buf != NULL) { @@ -1094,32 +1093,34 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - if (nxt_slow_path(recv_msg->fd < 0)) { + if (nxt_slow_path(recv_msg->fd[0] < 0)) { nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", - recv_msg->stream, recv_msg->fd); + recv_msg->stream, recv_msg->fd[0]); return NXT_UNIT_ERROR; } new_port_msg = recv_msg->start; - nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d", + nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", recv_msg->stream, (int) new_port_msg->pid, - (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2); + (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (new_port_msg->id == (nxt_port_id_t) -1) { nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); - new_port.in_fd = recv_msg->fd; + new_port.in_fd = recv_msg->fd[0]; new_port.out_fd = -1; mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, - MAP_SHARED, recv_msg->fd2, 0); + MAP_SHARED, recv_msg->fd[1], 0); } else { - if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd) != NXT_UNIT_OK)) { + if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) + != NXT_UNIT_OK)) + { return NXT_UNIT_ERROR; } @@ -1127,14 +1128,14 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port_msg->id); new_port.in_fd = -1; - new_port.out_fd = recv_msg->fd; + new_port.out_fd = recv_msg->fd[0]; mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, - MAP_SHARED, recv_msg->fd2, 0); + MAP_SHARED, recv_msg->fd[1], 0); } if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2, + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], strerror(errno), errno); return NXT_UNIT_ERROR; @@ -1142,7 +1143,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port.data = NULL; - recv_msg->fd = -1; + recv_msg->fd[0] = -1; port = nxt_unit_add_port(ctx, &new_port, mem); if (nxt_slow_path(port == NULL)) { @@ -1224,8 +1225,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req_impl->incoming_buf->prev = &req_impl->incoming_buf; recv_msg->incoming_buf = NULL; - req->content_fd = recv_msg->fd; - recv_msg->fd = -1; + req->content_fd = recv_msg->fd[0]; + recv_msg->fd[0] = -1; req->response_max_fields = 0; req_impl->state = NXT_UNIT_RS_START; @@ -1312,8 +1313,8 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->incoming_buf = NULL; } - req->content_fd = recv_msg->fd; - recv_msg->fd = -1; + req->content_fd = recv_msg->fd[0]; + recv_msg->fd[0] = -1; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); -- cgit From fd2c01c58f5f3bfd357e9931a9abb64083afc3ac Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 21:48:46 +0300 Subject: Fixing return value initialization. --- src/nxt_unit.c | 44 +++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 8dd03b82..6b7d631d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -898,7 +898,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - rc = NXT_UNIT_ERROR; recv_msg.fd[0] = -1; recv_msg.fd[1] = -1; port_msg = (nxt_port_msg_t *) rbuf->buf; @@ -924,12 +923,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) nxt_unit_quit(ctx); rc = NXT_UNIT_OK; - - goto fail; + goto done; } nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); - goto fail; + + rc = NXT_UNIT_ERROR; + goto done; } nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", @@ -946,16 +946,18 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { - nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", - port_msg->stream, (int) port_msg->type); - goto fail; + nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", + port_msg->stream, (int) port_msg->type); + rc = NXT_UNIT_ERROR; + goto done; } /* Fragmentation is unsupported. */ if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { - nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", - port_msg->stream, (int) port_msg->type); - goto fail; + nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", + port_msg->stream, (int) port_msg->type); + rc = NXT_UNIT_ERROR; + goto done; } if (port_msg->mmap) { @@ -967,7 +969,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) recv_msg.fd[1] = -1; } - goto fail; + goto done; } } @@ -993,7 +995,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) port_msg->stream, recv_msg.fd[0], lib->log_fd, strerror(errno), errno); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } rc = NXT_UNIT_OK; @@ -1004,7 +1007,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", port_msg->stream, recv_msg.fd[0]); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); @@ -1024,11 +1028,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) case _NXT_PORT_MSG_REMOVE_PID: if (nxt_slow_path(recv_msg.size != sizeof(pid))) { - nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " - "(%d != %d)", port_msg->stream, (int) recv_msg.size, - (int) sizeof(pid)); + nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " + "(%d != %d)", port_msg->stream, (int) recv_msg.size, + (int) sizeof(pid)); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } memcpy(&pid, recv_msg.start, sizeof(pid)); @@ -1049,10 +1054,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } -fail: +done: if (recv_msg.fd[0] != -1) { nxt_unit_close(recv_msg.fd[0]); -- cgit