From 9ea4be7e4ec5d0d24cddb5868aa4920f0298ee67 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 28 Jun 2019 12:19:48 +0300 Subject: Fixing allocation alignment for port fragments. All allocated blocks for lvlhash required to be aligned because lower address bits used for various extra information. Using unaligned blocks may cause invalid memory aceess. This was issue found on buildbot running large configuration tests. --- src/nxt_port_socket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/nxt_port_socket.c') diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index c9b5105b..fe113a68 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -668,7 +668,7 @@ nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) static void * nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) { - return nxt_mp_alloc(ctx, size); + return nxt_mp_align(ctx, size, size); } -- cgit From caea9d3c07543fecf9035ff2b406c190b714989e Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 16 Aug 2019 00:48:11 +0300 Subject: Fixing multi-thread port write racing conditions. --- src/nxt_port_socket.c | 308 ++++++++++++++++++++++++++------------------------ 1 file changed, 159 insertions(+), 149 deletions(-) (limited to 'src/nxt_port_socket.c') diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index fe113a68..4edc423a 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -7,9 +7,15 @@ #include +static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg); +static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); +static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); +static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, + nxt_port_send_msg_t *msg); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); @@ -116,13 +122,6 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) port->socket.write_work_queue = &port->engine->fast_work_queue; port->socket.write_handler = nxt_port_write_handler; port->socket.error_handler = nxt_port_error_handler; - - if (port->iov == NULL) { - port->iov = nxt_mp_get(port->mem_pool, - sizeof(struct iovec) * NXT_IOBUF_MAX * 10); - port->mmsg_buf = nxt_mp_get(port->mem_pool, - sizeof(uint32_t) * 3 * NXT_IOBUF_MAX * 10); - } } @@ -135,109 +134,11 @@ nxt_port_write_close(nxt_port_t *port) static void -nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_engine_t *engine; - nxt_port_send_msg_t *msg; - - msg = obj; - engine = data; - - nxt_assert(data == msg->work.data); - - if (engine != task->thread->engine) { - - nxt_debug(task, "current thread is %PT, expected %PT", - task->thread->tid, engine->task.thread->tid); - - nxt_event_engine_post(engine, &msg->work); - - return; - } - - nxt_mp_free(engine->mem_pool, obj); - nxt_mp_release(engine->mem_pool); -} - - -static nxt_port_send_msg_t * -nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m) -{ - nxt_mp_t *mp; - nxt_port_send_msg_t *msg; - - mp = task->thread->engine->mem_pool; - - msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t)); - if (nxt_slow_path(msg == NULL)) { - return NULL; - } - - nxt_mp_retain(mp); - - msg->link.next = NULL; - msg->link.prev = NULL; - - msg->buf = m->buf; - msg->share = m->share; - msg->fd = m->fd; - msg->close_fd = m->close_fd; - msg->port_msg = m->port_msg; - - msg->work.next = NULL; - msg->work.handler = nxt_port_release_send_msg; - msg->work.task = task; - msg->work.obj = msg; - msg->work.data = task->thread->engine; - - return msg; -} - - -static nxt_port_send_msg_t * -nxt_port_msg_insert_head(nxt_task_t *task, nxt_port_t *port, - nxt_port_send_msg_t *msg) +nxt_port_release_send_msg(nxt_port_send_msg_t *msg) { - if (msg->work.data == NULL) { - msg = nxt_port_msg_create(task, msg); + if (msg->allocated) { + nxt_free(msg); } - - if (msg != NULL) { - nxt_queue_insert_head(&port->messages, &msg->link); - } - - return msg; -} - - -static nxt_port_send_msg_t * -nxt_port_msg_insert_tail(nxt_task_t *task, nxt_port_t *port, - nxt_port_send_msg_t *msg) -{ - if (msg->work.data == NULL) { - msg = nxt_port_msg_create(task, msg); - } - - if (msg != NULL) { - nxt_queue_insert_tail(&port->messages, &msg->link); - } - - return msg; -} - - -static nxt_port_send_msg_t * -nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) -{ - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&port->messages); - - if (lnk == nxt_queue_tail(&port->messages)) { - return msg; - } - - return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); } @@ -246,15 +147,17 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, void *tracking) { - nxt_port_send_msg_t msg, *res; + nxt_int_t res; + nxt_port_send_msg_t msg; msg.link.next = NULL; msg.link.prev = NULL; msg.buf = b; + msg.share = 0; msg.fd = fd; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; - msg.share = 0; + msg.allocated = 0; if (tracking != NULL) { nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); @@ -270,25 +173,63 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.mf = 0; msg.port_msg.tracking = tracking != NULL; - msg.work.data = NULL; - - if (port->socket.write_ready) { + res = nxt_port_msg_chk_insert(task, port, &msg); + if (nxt_fast_path(res == NXT_DECLINED)) { nxt_port_write_handler(task, &port->socket, &msg); - } else { - nxt_thread_mutex_lock(&port->write_mutex); + res = NXT_OK; + } + + return res; +} + - res = nxt_port_msg_insert_tail(task, port, &msg); +static nxt_int_t +nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg) +{ + nxt_int_t res; + + nxt_thread_mutex_lock(&port->write_mutex); + + if (nxt_fast_path(port->socket.write_ready + && nxt_queue_is_empty(&port->messages))) + { + res = NXT_DECLINED; + + } else { + msg = nxt_port_msg_alloc(msg); - nxt_thread_mutex_unlock(&port->write_mutex); + if (nxt_fast_path(msg != NULL)) { + nxt_queue_insert_tail(&port->messages, &msg->link); + nxt_port_use(task, port, 1); + res = NXT_OK; - if (res == NULL) { - return NXT_ERROR; + } else { + res = NXT_ERROR; } + } + + nxt_thread_mutex_unlock(&port->write_mutex); + + return res; +} + + +static nxt_port_send_msg_t * +nxt_port_msg_alloc(nxt_port_send_msg_t *m) +{ + nxt_port_send_msg_t *msg; - nxt_port_use(task, port, 1); + msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); + if (nxt_slow_path(msg == NULL)) { + return NULL; } - return NXT_OK; + *msg = *m; + + msg->allocated = 1; + + return msg; } @@ -312,9 +253,10 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) int use_delta; size_t plain_size; ssize_t n; + uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; nxt_bool_t block_write, enable_write; nxt_port_t *port; - struct iovec *iov; + struct iovec iov[NXT_IOBUF_MAX * 10]; nxt_work_queue_t *wq; nxt_port_method_t m; nxt_port_send_msg_t *msg; @@ -326,20 +268,23 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) enable_write = 0; use_delta = 0; - nxt_thread_mutex_lock(&port->write_mutex); - - iov = port->iov; - wq = &task->thread->engine->fast_work_queue; do { - msg = nxt_port_msg_first(task, port, data); + if (data) { + msg = data; - if (msg == NULL) { - block_write = 1; - goto unlock_mutex; + } else { + msg = nxt_port_msg_first(port); + + if (msg == NULL) { + block_write = 1; + goto cleanup; + } } +next_fragment: + iov[0].iov_base = &msg->port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); @@ -377,7 +322,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) * is bigger than PORT_MMAP_MIN_SIZE. */ if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { - nxt_port_mmap_write(task, port, msg, &sb); + nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); } else { m = NXT_PORT_METHOD_PLAIN; @@ -402,7 +347,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, - m == NXT_PORT_METHOD_MMAP); + m == NXT_PORT_METHOD_MMAP); if (msg->buf != NULL) { nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, @@ -421,36 +366,58 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg->share = 0; if (msg->link.next != NULL) { + nxt_thread_mutex_lock(&port->write_mutex); + nxt_queue_remove(&msg->link); - use_delta--; - } - data = NULL; + nxt_queue_insert_tail(&port->messages, &msg->link); + + nxt_thread_mutex_unlock(&port->write_mutex); + + } else { + msg = nxt_port_msg_insert_tail(port, msg); + if (nxt_slow_path(msg == NULL)) { + goto fail; + } - if (nxt_port_msg_insert_tail(task, port, msg) != NULL) { use_delta++; } + + } else { + goto next_fragment; } } else { if (msg->link.next != NULL) { + nxt_thread_mutex_lock(&port->write_mutex); + nxt_queue_remove(&msg->link); + msg->link.next = NULL; + + nxt_thread_mutex_unlock(&port->write_mutex); + use_delta--; - nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->work.data); } - data = NULL; + + nxt_port_release_send_msg(msg); } - } else { - if (msg->link.next == NULL) { - if (nxt_port_msg_insert_head(task, port, msg) != NULL) { - use_delta++; - } + if (data != NULL) { + goto cleanup; } + } else { if (nxt_slow_path(n == NXT_ERROR)) { goto fail; } + + if (msg->link.next == NULL) { + msg = nxt_port_msg_insert_tail(port, msg); + if (nxt_slow_path(msg == NULL)) { + goto fail; + } + + use_delta++; + } } } while (port->socket.write_ready); @@ -459,7 +426,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) enable_write = 1; } - goto unlock_mutex; + goto cleanup; fail: @@ -468,8 +435,7 @@ fail: nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, &port->socket); -unlock_mutex: - nxt_thread_mutex_unlock(&port->write_mutex); +cleanup: if (block_write && nxt_fd_event_is_active(port->socket.write)) { nxt_port_post(task, port, nxt_port_fd_block_write, NULL); @@ -485,6 +451,29 @@ unlock_mutex: } +static nxt_port_send_msg_t * +nxt_port_msg_first(nxt_port_t *port) +{ + nxt_queue_link_t *lnk; + nxt_port_send_msg_t *msg; + + nxt_thread_mutex_lock(&port->write_mutex); + + lnk = nxt_queue_first(&port->messages); + + if (lnk == nxt_queue_tail(&port->messages)) { + msg = NULL; + + } else { + msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); + } + + nxt_thread_mutex_unlock(&port->write_mutex); + + return msg; +} + + static nxt_buf_t * nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode) @@ -546,6 +535,27 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, } +static nxt_port_send_msg_t * +nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) +{ + if (msg->allocated == 0) { + msg = nxt_port_msg_alloc(msg); + + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + } + + nxt_thread_mutex_lock(&port->write_mutex); + + nxt_queue_insert_tail(&port->messages, &msg->link); + + nxt_thread_mutex_unlock(&port->write_mutex); + + return msg; +} + + void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) { @@ -986,8 +996,8 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(&msg->link); use_delta--; - nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->work.data); + + nxt_port_release_send_msg(msg); } nxt_queue_loop; -- cgit