From 7574c64992b98d3dfbc3dd101bd0f7d78bad0823 Mon Sep 17 00:00:00 2001 From: Igor Sysoev Date: Wed, 14 Jun 2017 15:18:52 +0300 Subject: nxt_event_conn_... functions and structures have been renamed to nxt_conn_... --- src/nxt_application.c | 48 +- src/nxt_application.h | 2 +- src/nxt_conn.c | 152 ++++++ src/nxt_conn.h | 350 +++++++++++++ src/nxt_conn_accept.c | 366 +++++++++++++ src/nxt_conn_close.c | 12 +- src/nxt_conn_connect.c | 194 +++++++ src/nxt_conn_proxy.c | 998 ++++++++++++++++++++++++++++++++++++ src/nxt_conn_read.c | 241 +++++++++ src/nxt_conn_write.c | 422 +++++++++++++++ src/nxt_controller.c | 66 +-- src/nxt_devpoll_engine.c | 2 +- src/nxt_epoll_engine.c | 97 ++-- src/nxt_event_conn.c | 152 ------ src/nxt_event_conn.h | 359 ------------- src/nxt_event_conn_accept.c | 413 --------------- src/nxt_event_conn_connect.c | 194 ------- src/nxt_event_conn_job_sendfile.c | 16 +- src/nxt_event_conn_proxy.c | 1017 ------------------------------------- src/nxt_event_conn_read.c | 243 --------- src/nxt_event_conn_write.c | 420 --------------- src/nxt_event_engine.h | 6 +- src/nxt_event_file.h | 18 - src/nxt_eventport_engine.c | 2 +- src/nxt_file_event.h | 18 + src/nxt_kqueue_engine.c | 117 +++-- src/nxt_listen_socket.c | 2 +- src/nxt_macosx_sendfile.c | 3 +- src/nxt_main.h | 9 +- src/nxt_openssl.c | 46 +- src/nxt_poll_engine.c | 2 +- src/nxt_pollset_engine.c | 2 +- src/nxt_router.c | 47 +- src/nxt_runtime.c | 6 +- src/nxt_select_engine.c | 2 +- src/nxt_sendbuf.c | 4 +- src/nxt_sendbuf.h | 16 +- src/nxt_ssltls.h | 3 +- src/nxt_stream_module.c | 10 +- src/nxt_stream_source.c | 10 +- src/nxt_stream_source.h | 2 +- src/nxt_worker_process.c | 16 +- 42 files changed, 3009 insertions(+), 3096 deletions(-) create mode 100644 src/nxt_conn.c create mode 100644 src/nxt_conn.h create mode 100644 src/nxt_conn_accept.c create mode 100644 src/nxt_conn_connect.c create mode 100644 src/nxt_conn_proxy.c create mode 100644 src/nxt_conn_read.c create mode 100644 src/nxt_conn_write.c delete mode 100644 src/nxt_event_conn.c delete mode 100644 src/nxt_event_conn.h delete mode 100644 src/nxt_event_conn_accept.c delete mode 100644 src/nxt_event_conn_connect.c delete mode 100644 src/nxt_event_conn_proxy.c delete mode 100644 src/nxt_event_conn_read.c delete mode 100644 src/nxt_event_conn_write.c delete mode 100644 src/nxt_event_file.h create mode 100644 src/nxt_file_event.h (limited to 'src') diff --git a/src/nxt_application.c b/src/nxt_application.c index 293807b0..a07a9038 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -17,19 +17,19 @@ static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_app_thread(void *ctx); static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, nxt_log_t *log); -static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, +static void nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log); static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); -static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out); +static void nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out); static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c, +static nxt_msec_t nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data); -static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c); +static void nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c); static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data); @@ -256,8 +256,8 @@ nxt_app_thread(void *ctx) static nxt_app_request_t * nxt_app_request_create(nxt_socket_t s, nxt_log_t *log) { + nxt_conn_t *c; nxt_mem_pool_t *mp; - nxt_event_conn_t *c; nxt_app_request_t *r; mp = nxt_mem_pool_create(1024); @@ -270,7 +270,7 @@ nxt_app_request_create(nxt_socket_t s, nxt_log_t *log) return NULL; } - c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t)); + c = nxt_mem_zalloc(mp, sizeof(nxt_conn_t)); if (nxt_slow_path(c == NULL)) { return NULL; } @@ -534,7 +534,7 @@ nxt_app_http_process_headers(nxt_app_request_t *r) static void -nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log) +nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log) { c->socket.write_ready = 1; @@ -562,8 +562,8 @@ nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log) c->read_work_queue = &thr->engine->read_work_queue; c->write_work_queue = &thr->engine->write_work_queue; - nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); - nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); + nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); + nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections); } @@ -770,7 +770,7 @@ nxt_app_write_finish(nxt_app_request_t *r) static void -nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out) +nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out) { nxt_app_buf_t *ab; @@ -785,9 +785,9 @@ nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out) static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; - nxt_mem_pool_t *mp; - nxt_event_conn_t *c; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_mem_pool_t *mp; c = obj; b = data; @@ -820,7 +820,7 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) c->write = b; c->write_state = &nxt_app_delivery_write_state; - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); } @@ -840,8 +840,8 @@ static const nxt_event_conn_state_t nxt_app_delivery_write_state static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b, *next; - nxt_event_conn_t *c; + nxt_buf_t *b, *next; + nxt_conn_t *c; c = obj; @@ -875,8 +875,8 @@ static const nxt_event_conn_state_t nxt_app_delivery_close_state static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b, *bn, *free; - nxt_event_conn_t *c; + nxt_buf_t *b, *bn, *free; + nxt_conn_t *c; nxt_app_request_t *r; nxt_debug(task, "app delivery completion"); @@ -902,7 +902,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) c = r->event_conn; c->write_state = &nxt_app_delivery_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } } @@ -929,7 +929,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -942,7 +942,7 @@ nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data) static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -953,7 +953,7 @@ nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data) static nxt_msec_t -nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data) +nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data) { /* 30000 ms */ return 30000; @@ -961,7 +961,7 @@ nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data) static void -nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c) +nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c) { if (c->write == NULL) { return; @@ -981,7 +981,7 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c) static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_app_request_t *r; c = obj; diff --git a/src/nxt_application.h b/src/nxt_application.h index c2619ded..5fd49667 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -29,7 +29,7 @@ typedef struct { typedef struct { nxt_event_engine_t *engine; nxt_mem_pool_t *mem_pool; - nxt_event_conn_t *event_conn; + nxt_conn_t *event_conn; nxt_log_t *log; nxt_buf_t *output_buf; diff --git a/src/nxt_conn.c b/src/nxt_conn.c new file mode 100644 index 00000000..3f21747e --- /dev/null +++ b/src/nxt_conn.c @@ -0,0 +1,152 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include + + +nxt_conn_io_t nxt_unix_conn_io = { + nxt_conn_io_connect, + nxt_conn_io_accept, + + nxt_conn_io_read, + nxt_conn_io_recvbuf, + nxt_conn_io_recv, + + nxt_conn_io_write, + nxt_event_conn_io_write_chunk, + +#if (NXT_HAVE_LINUX_SENDFILE) + nxt_linux_event_conn_io_sendfile, +#elif (NXT_HAVE_FREEBSD_SENDFILE) + nxt_freebsd_event_conn_io_sendfile, +#elif (NXT_HAVE_MACOSX_SENDFILE) + nxt_macosx_event_conn_io_sendfile, +#elif (NXT_HAVE_SOLARIS_SENDFILEV) + nxt_solaris_event_conn_io_sendfilev, +#elif (NXT_HAVE_AIX_SEND_FILE) + nxt_aix_event_conn_io_send_file, +#elif (NXT_HAVE_HPUX_SENDFILE) + nxt_hpux_event_conn_io_sendfile, +#else + nxt_event_conn_io_sendbuf, +#endif + + nxt_event_conn_io_writev, + nxt_event_conn_io_send, + + nxt_conn_io_shutdown, +}; + + +nxt_conn_t * +nxt_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task) +{ + nxt_conn_t *c; + nxt_thread_t *thr; + + c = nxt_mem_zalloc(mp, sizeof(nxt_conn_t)); + if (nxt_slow_path(c == NULL)) { + return NULL; + } + + c->mem_pool = mp; + + c->socket.fd = -1; + + c->socket.log = &c->log; + c->log = *task->log; + + /* The while loop skips possible uint32_t overflow. */ + + while (c->log.ident == 0) { + c->log.ident = nxt_task_next_ident(); + } + + thr = nxt_thread(); + thr->engine->connections++; + + c->task.thread = thr; + c->task.log = &c->log; + c->task.ident = c->log.ident; + c->socket.task = &c->task; + c->read_timer.task = &c->task; + c->write_timer.task = &c->task; + + c->io = thr->engine->event.io; + c->max_chunk = NXT_INT32_T_MAX; + c->sendfile = NXT_CONN_SENDFILE_UNSET; + + c->socket.read_work_queue = &thr->engine->fast_work_queue; + c->socket.write_work_queue = &thr->engine->fast_work_queue; + + nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); + nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); + + nxt_log_debug(&c->log, "connections: %uD", thr->engine->connections); + + return c; +} + + +void +nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) +{ + int ret; + socklen_t len; + nxt_conn_t *c; + struct linger linger; + + c = obj; + + nxt_debug(task, "event conn shutdown"); + + if (c->socket.timedout) { + /* + * Resetting of timed out connection on close + * releases kernel memory associated with socket. + * This also causes sending TCP/IP RST to a peer. + */ + linger.l_onoff = 1; + linger.l_linger = 0; + len = sizeof(struct linger); + + ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); + + if (nxt_slow_path(ret != 0)) { + nxt_log(task, NXT_LOG_CRIT, "setsockopt(%d, SO_LINGER) failed %E", + c->socket.fd, nxt_socket_errno); + } + } + + c->write_state->close_handler(task, c, data); +} + + +void +nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c, + const nxt_conn_state_t *state, nxt_timer_t *timer) +{ + nxt_msec_t value; + + if (state->timer_value != NULL) { + value = state->timer_value(c, state->timer_data); + + if (value != 0) { + timer->handler = state->timer_handler; + nxt_timer_add(engine, timer, value); + } + } +} + + +void +nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq) +{ + c->read_work_queue = wq; + c->write_work_queue = wq; + c->read_timer.work_queue = wq; + c->write_timer.work_queue = wq; +} diff --git a/src/nxt_conn.h b/src/nxt_conn.h new file mode 100644 index 00000000..491531b0 --- /dev/null +++ b/src/nxt_conn.h @@ -0,0 +1,350 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_CONN_H_INCLUDED_ +#define _NXT_CONN_H_INCLUDED_ + + +typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data); + + +typedef struct { + nxt_work_handler_t ready_handler; + nxt_work_handler_t close_handler; + nxt_work_handler_t error_handler; + + nxt_work_handler_t timer_handler; + nxt_conn_timer_value_t timer_value; + uintptr_t timer_data; + + uint8_t timer_autoreset; +} nxt_conn_state_t; + + +typedef struct { + double average; + size_t limit; + size_t limit_after; + size_t max_limit; + nxt_msec_t last; +} nxt_event_write_rate_t; + + +typedef struct { + + nxt_work_handler_t connect; + nxt_work_handler_t accept; + + /* + * The read() with NULL c->read buffer waits readiness of a connection + * to avoid allocation of read buffer if the connection will time out + * or will be closed with error. The kqueue-specific read() can also + * detect case if a client did not sent anything and has just closed the + * connection without errors. In the latter case state's close_handler + * is called. + */ + nxt_work_handler_t read; + + ssize_t (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b); + + ssize_t (*recv)(nxt_conn_t *c, void *buf, + size_t size, nxt_uint_t flags); + + /* + * The write() is an interface to write a buffer chain with a given rate + * limit. It calls write_chunk() in a loop and handles write event timer. + */ + nxt_work_handler_t write; + + /* + * The write_chunk() interface writes a buffer chain with a given limit + * and toggles write event. SSL/TLS libraries' write_chunk() interface + * buffers data and calls the library specific send() interface to write + * the buffered data eventually. + */ + ssize_t (*write_chunk)(nxt_conn_t *c, + nxt_buf_t *b, size_t limit); + + /* + * The sendbuf() is an interface for OS-specific sendfile + * implementations or simple writev(). + */ + ssize_t (*sendbuf)(nxt_conn_t *c, nxt_buf_t *b, + size_t limit); + /* + * The writev() is an interface to write several nxt_iobuf_t buffers. + */ + ssize_t (*writev)(nxt_conn_t *c, + nxt_iobuf_t *iob, nxt_uint_t niob); + /* + * The send() is an interface to write a single buffer. SSL/TLS + * libraries' send() interface handles also the libraries' errors. + */ + ssize_t (*send)(nxt_conn_t *c, void *buf, + size_t size); + + nxt_work_handler_t shutdown; +} nxt_conn_io_t; + + +/* + * The nxt_listen_event_t is separated from nxt_listen_socket_t + * because nxt_listen_socket_t is one per process whilst each worker + * thread uses own nxt_listen_event_t. + */ +typedef struct { + /* Must be the first field. */ + nxt_fd_event_t socket; + + nxt_task_t task; + + uint32_t ready; + uint32_t batch; + + /* An accept() interface is cached to minimize memory accesses. */ + nxt_work_handler_t accept; + + nxt_listen_socket_t *listen; + nxt_conn_t *next; /* STUB */; + nxt_work_queue_t *work_queue; + + nxt_timer_t timer; + + nxt_queue_link_t link; +} nxt_listen_event_t; + + +struct nxt_conn_s { + /* + * Must be the first field, since nxt_fd_event_t + * and nxt_conn_t are used interchangeably. + */ + nxt_fd_event_t socket; + + nxt_buf_t *read; + const nxt_conn_state_t *read_state; + nxt_work_queue_t *read_work_queue; + nxt_timer_t read_timer; + + nxt_buf_t *write; + const nxt_conn_state_t *write_state; + nxt_work_queue_t *write_work_queue; + nxt_event_write_rate_t *rate; + nxt_timer_t write_timer; + + nxt_off_t sent; + uint32_t max_chunk; + uint32_t nbytes; + + nxt_conn_io_t *io; + +#if (NXT_SSLTLS || NXT_THREADS) + /* SunC does not support "zero-sized struct/union". */ + + union { +#if (NXT_SSLTLS) + void *ssltls; +#endif +#if (NXT_THREADS) + nxt_thread_pool_t *thread_pool; +#endif + } u; + +#endif + + nxt_mem_pool_t *mem_pool; + + nxt_task_t task; + nxt_log_t log; + + nxt_listen_event_t *listen; + nxt_sockaddr_t *remote; + nxt_sockaddr_t *local; + const char *action; + + uint8_t peek; + uint8_t blocked; /* 1 bit */ + uint8_t delayed; /* 1 bit */ + +#define NXT_CONN_SENDFILE_OFF 0 +#define NXT_CONN_SENDFILE_ON 1 +#define NXT_CONN_SENDFILE_UNSET 3 + + uint8_t sendfile; /* 2 bits */ + uint8_t tcp_nodelay; /* 1 bit */ + + nxt_queue_link_t link; +}; + + +#define nxt_conn_timer_init(ev, c, wq) \ + do { \ + (ev)->work_queue = (wq); \ + (ev)->log = &(c)->log; \ + (ev)->precision = NXT_TIMER_DEFAULT_PRECISION; \ + } while (0) + + +#define nxt_read_timer_conn(ev) \ + nxt_timer_data(ev, nxt_conn_t, read_timer) + + +#define nxt_write_timer_conn(ev) \ + nxt_timer_data(ev, nxt_conn_t, write_timer) + + +#if (NXT_HAVE_UNIX_DOMAIN) + +#define nxt_conn_tcp_nodelay_on(task, c) \ + do { \ + nxt_int_t ret; \ + \ + if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \ + ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ + TCP_NODELAY, 1); \ + \ + (c)->tcp_nodelay = (ret == NXT_OK); \ + } \ + } while (0) + + +#else + +#define nxt_conn_tcp_nodelay_on(task, c) \ + do { \ + nxt_int_t ret; \ + \ + ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ + TCP_NODELAY, 1); \ + \ + (c)->tcp_nodelay = (ret == NXT_OK); \ + } while (0) + +#endif + + +NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task); +void nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c); + +NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c, + const nxt_conn_state_t *state, nxt_timer_t *tev); +NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq); + +void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data); +void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data); +nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c); +void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data); +void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data); + +NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task, + nxt_listen_socket_t *ls); +void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, + nxt_conn_t *c); +void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, + const char *accept_syscall, nxt_err_t err); + +void nxt_conn_wait(nxt_conn_t *c); + +void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data); +ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); +ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, + nxt_uint_t flags); + +void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data); +ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb); +ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, + nxt_iobuf_t *iob, nxt_uint_t niob); +ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, + size_t size); + +size_t nxt_event_conn_write_limit(nxt_conn_t *c); +nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, + nxt_conn_t *c, size_t sent); +ssize_t nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, + size_t limit); +ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, + nxt_uint_t niob); +ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size); + +NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, + nxt_conn_t *c); + + +#define nxt_conn_connect(engine, c) \ + nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket, \ + c->socket.task, c, c->socket.data) + + +#define nxt_conn_read(engine, c) \ + do { \ + nxt_event_engine_t *e = engine; \ + \ + c->socket.read_work_queue = &e->read_work_queue; \ + \ + nxt_work_queue_add(&e->read_work_queue, c->io->read, \ + c->socket.task, c, c->socket.data); \ + } while (0) + + +#define nxt_conn_write(e, c) \ + do { \ + nxt_event_engine_t *engine = e; \ + \ + c->socket.write_work_queue = &engine->write_work_queue; \ + \ + nxt_work_queue_add(&engine->write_work_queue, c->io->write, \ + c->socket.task, c, c->socket.data); \ + } while (0) + + +extern nxt_conn_io_t nxt_unix_conn_io; + + +typedef struct { + /* + * Client and peer connections are not embedded because already + * existent connections can be switched to the event connection proxy. + */ + nxt_conn_t *client; + nxt_conn_t *peer; + nxt_buf_t *client_buffer; + nxt_buf_t *peer_buffer; + + size_t client_buffer_size; + size_t peer_buffer_size; + + nxt_msec_t client_wait_timeout; + nxt_msec_t connect_timeout; + nxt_msec_t reconnect_timeout; + nxt_msec_t peer_wait_timeout; + nxt_msec_t client_write_timeout; + nxt_msec_t peer_write_timeout; + + uint8_t connected; /* 1 bit */ + uint8_t delayed; /* 1 bit */ + uint8_t retries; /* 8 bits */ + uint8_t retain; /* 2 bits */ + + nxt_work_handler_t completion_handler; +} nxt_conn_proxy_t; + + +NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c); +NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p); + + +/* STUB */ +#define nxt_event_conn_t nxt_conn_t +#define nxt_event_conn_state_t nxt_conn_state_t +#define nxt_event_conn_proxy_t nxt_conn_proxy_t +#define nxt_event_conn_read nxt_conn_read +#define nxt_event_conn_write nxt_conn_write +#define nxt_event_conn_close nxt_conn_close + + +#endif /* _NXT_CONN_H_INCLUDED_ */ diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c new file mode 100644 index 00000000..eb0172f4 --- /dev/null +++ b/src/nxt_conn_accept.c @@ -0,0 +1,366 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include + + +/* + * A listen socket handler calls an event facility specific io_accept() + * method. The method accept()s a new connection and then calls + * nxt_event_conn_accept() to handle the new connection and to prepare + * for a next connection to avoid just dropping next accept()ed socket + * if no more connections allowed. If there are no available connections + * an idle connection would be closed. If there are no idle connections + * then new connections will not be accept()ed for 1 second. + */ + + +static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task, + nxt_listen_event_t *lev); +static void nxt_conn_listen_handler(nxt_task_t *task, void *obj, + void *data); +static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task, + nxt_listen_event_t *lev); +static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task, + nxt_listen_event_t *lev); +static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, + void *data); + + +nxt_listen_event_t * +nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) +{ + nxt_listen_event_t *lev; + nxt_event_engine_t *engine; + + lev = nxt_zalloc(sizeof(nxt_listen_event_t)); + + if (nxt_fast_path(lev != NULL)) { + lev->socket.fd = ls->socket; + + engine = task->thread->engine; + lev->batch = engine->batch; + + lev->socket.read_work_queue = &engine->accept_work_queue; + lev->socket.read_handler = nxt_conn_listen_handler; + lev->socket.error_handler = nxt_conn_listen_event_error; + lev->socket.log = &nxt_main_log; + + lev->accept = engine->event.io->accept; + + lev->listen = ls; + lev->work_queue = &engine->read_work_queue; + + lev->timer.work_queue = &engine->fast_work_queue; + lev->timer.handler = nxt_conn_listen_timer_handler; + lev->timer.log = &nxt_main_log; + + lev->task.thread = task->thread; + lev->task.log = &nxt_main_log; + lev->task.ident = nxt_task_next_ident(); + lev->socket.task = &lev->task; + lev->timer.task = &lev->task; + + if (nxt_conn_accept_alloc(task, lev) != NULL) { + nxt_fd_event_enable_accept(engine, &lev->socket); + + nxt_queue_insert_head(&engine->listen_connections, &lev->link); + } + + return lev; + } + + return NULL; +} + + +static nxt_conn_t * +nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + nxt_sockaddr_t *sa, *remote; + nxt_mem_pool_t *mp; + nxt_event_engine_t *engine; + nxt_listen_socket_t *ls; + + engine = task->thread->engine; + + if (engine->connections < engine->max_connections) { + + mp = nxt_mem_pool_create(lev->listen->mem_pool_size); + + if (nxt_fast_path(mp != NULL)) { + /* This allocation cannot fail. */ + c = nxt_conn_create(mp, lev->socket.task); + + lev->next = c; + c->socket.read_work_queue = lev->socket.read_work_queue; + c->socket.write_ready = 1; + c->listen = lev; + + ls = lev->listen; + /* This allocation cannot fail. */ + remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); + c->remote = remote; + + sa = ls->sockaddr; + remote->type = sa->type; + /* + * Set address family for unspecified Unix domain, + * because these sockaddr's are not be passed to accept(). + */ + remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family; + + return c; + } + } + + return NULL; +} + + +static void +nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_listen_event_t *lev; + + lev = obj; + lev->ready = lev->batch; + + lev->accept(task, lev, data); +} + + +void +nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) +{ + socklen_t len; + nxt_conn_t *c; + nxt_socket_t s; + struct sockaddr *sa; + nxt_listen_event_t *lev; + + lev = obj; + c = lev->next; + + lev->ready--; + lev->socket.read_ready = (lev->ready != 0); + + len = c->remote->socklen; + + if (len >= sizeof(struct sockaddr)) { + sa = &c->remote->u.sockaddr; + + } else { + sa = NULL; + len = 0; + } + + s = accept(lev->socket.fd, sa, &len); + + if (s == -1) { + nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); + return; + } + + c->socket.fd = s; + +#if (NXT_LINUX) + /* + * Linux does not inherit non-blocking mode + * from listen socket for accept()ed socket. + */ + if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { + nxt_socket_close(task, s); + } + +#endif + + nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); + + nxt_conn_accept(task, lev, c); +} + + +void +nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) +{ + nxt_conn_t *next; + + nxt_sockaddr_text(c->remote); + + nxt_debug(task, "client: %*s", + c->remote->address_length, nxt_sockaddr_address(c->remote)); + + nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); + + c->read_work_queue = lev->work_queue; + c->write_work_queue = lev->work_queue; + + if (lev->listen->read_after_accept) { + + //c->socket.read_ready = 1; +// lev->listen->handler(task, c, lev->socket.data); + nxt_work_queue_add(c->read_work_queue, lev->listen->handler, + task, c, lev->socket.data); + + } else { + nxt_work_queue_add(c->write_work_queue, lev->listen->handler, + task, c, lev->socket.data); + } + + next = nxt_conn_accept_next(task, lev); + + if (next != NULL && lev->socket.read_ready) { + nxt_work_queue_add(lev->socket.read_work_queue, + lev->accept, task, lev, next); + } +} + + +static nxt_conn_t * +nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + + lev->next = NULL; + + do { + c = nxt_conn_accept_alloc(task, lev); + + if (nxt_fast_path(c != NULL)) { + return c; + } + + } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); + + nxt_log(task, NXT_LOG_CRIT, "no available connections, " + "new connections are not accepted within 1s"); + + return NULL; +} + + +static nxt_int_t +nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + nxt_queue_t *idle; + nxt_queue_link_t *link; + nxt_event_engine_t *engine; + + static nxt_log_moderation_t nxt_idle_close_log_moderation = { + NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION + }; + + engine = task->thread->engine; + + idle = &engine->idle_connections; + + for (link = nxt_queue_last(idle); + link != nxt_queue_head(idle); + link = nxt_queue_next(link)) + { + c = nxt_queue_link_data(link, nxt_conn_t, link); + + if (!c->socket.read_ready) { + nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, + task->log, "no available connections, " + "close idle connection"); + nxt_queue_remove(link); + nxt_conn_close(engine, c); + + return NXT_OK; + } + } + + nxt_timer_add(engine, &lev->timer, 1000); + + nxt_fd_event_disable_read(engine, &lev->socket); + + return NXT_DECLINED; +} + + +void +nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, + const char *accept_syscall, nxt_err_t err) +{ + static nxt_log_moderation_t nxt_accept_log_moderation = { + NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION + }; + + lev->socket.read_ready = 0; + + switch (err) { + + case NXT_EAGAIN: + nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); + return; + + case ECONNABORTED: + nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, + task->log, "%s(%d) failed %E", + accept_syscall, lev->socket.fd, err); + return; + + case EMFILE: + case ENFILE: + case ENOBUFS: + case ENOMEM: + if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " + "new connections are not accepted within 1s", + accept_syscall, lev->socket.fd, err); + } + + return; + + default: + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", + accept_syscall, lev->socket.fd, err); + return; + } +} + + +static void +nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_listen_event_t *lev; + + timer = obj; + + lev = nxt_timer_data(timer, nxt_listen_event_t, timer); + c = lev->next; + + if (c == NULL) { + c = nxt_conn_accept_next(task, lev); + + if (c == NULL) { + return; + } + } + + nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); + + lev->accept(task, lev, c); +} + + +static void +nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_fd_event_t *ev; + + ev = obj; + + nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); +} diff --git a/src/nxt_conn_close.c b/src/nxt_conn_close.c index 18b2450c..fb86d052 100644 --- a/src/nxt_conn_close.c +++ b/src/nxt_conn_close.c @@ -16,7 +16,7 @@ static void nxt_conn_close_error_ignore(nxt_task_t *task, void *obj, void -nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) +nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c) { int ret; socklen_t len; @@ -69,7 +69,7 @@ nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; c = obj; @@ -90,7 +90,7 @@ static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) { nxt_uint_t events_pending, timers_pending; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; c = obj; @@ -132,12 +132,12 @@ nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *timer; - nxt_event_conn_t *c; + nxt_conn_t *c; + nxt_timer_t *timer; timer = obj; - c = nxt_event_write_timer_conn(timer); + c = nxt_write_timer_conn(timer); nxt_debug(task, "conn close timer handler fd:%d", c->socket.fd); diff --git a/src/nxt_conn_connect.c b/src/nxt_conn_connect.c new file mode 100644 index 00000000..94d25c30 --- /dev/null +++ b/src/nxt_conn_connect.c @@ -0,0 +1,194 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include + + +void +nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_work_handler_t handler; + + c = obj; + + if (nxt_conn_socket(task, c) == NXT_OK) { + c->socket.write_work_queue = c->write_work_queue; + handler = c->io->connect; + + } else { + handler = c->write_state->error_handler; + } + + nxt_work_queue_add(&task->thread->engine->connect_work_queue, + handler, task, c, data); +} + + +void +nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_work_handler_t handler; + nxt_event_engine_t *engine; + const nxt_conn_state_t *state; + + c = obj; + + state = c->write_state; + + switch (nxt_socket_connect(task, c->socket.fd, c->remote)) { + + case NXT_OK: + c->socket.write_ready = 1; + handler = state->ready_handler; + break; + + case NXT_AGAIN: + c->socket.write_handler = nxt_conn_connect_test; + c->socket.error_handler = state->error_handler; + + engine = task->thread->engine; + + nxt_conn_timer(engine, c, state, &c->write_timer); + + nxt_fd_event_enable_write(engine, &c->socket); + return; + + case NXT_DECLINED: + handler = state->close_handler; + break; + + default: /* NXT_ERROR */ + handler = state->error_handler; + break; + } + + nxt_work_queue_add(c->write_work_queue, handler, task, c, data); +} + + +nxt_int_t +nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c) +{ + nxt_uint_t family; + nxt_socket_t s; + + nxt_debug(task, "event conn socket"); + + family = c->remote->u.sockaddr.sa_family; + + s = nxt_socket_create(task, family, c->remote->type, 0, NXT_NONBLOCK); + + if (nxt_slow_path(s == -1)) { + return NXT_ERROR; + } + + c->sendfile = 1; + +#if (NXT_HAVE_UNIX_DOMAIN && NXT_SOLARIS) + + if (family == AF_UNIX) { + /* Solaris AF_UNIX does not support sendfilev(). */ + c->sendfile = 0; + } + +#endif + + c->socket.fd = s; + + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + + if (c->local != NULL) { + if (nxt_slow_path(nxt_socket_bind(task, s, c->local, 0) != NXT_OK)) { + nxt_socket_close(task, s); + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +void +nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data) +{ + int ret, err; + socklen_t len; + nxt_conn_t *c; + + c = obj; + + nxt_debug(task, "event connect test fd:%d", c->socket.fd); + + nxt_fd_event_block_write(task->thread->engine, &c->socket); + + if (c->write_state->timer_autoreset) { + nxt_timer_disable(task->thread->engine, &c->write_timer); + } + + err = 0; + len = sizeof(int); + + /* + * Linux and BSDs return 0 and store a pending error in the err argument; + * Solaris returns -1 and sets the errno. + */ + + ret = getsockopt(c->socket.fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len); + + if (nxt_slow_path(ret == -1)) { + err = nxt_errno; + } + + if (err == 0) { + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, data); + return; + } + + c->socket.error = err; + + nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", + c->socket.fd, c->remote->length, nxt_sockaddr_start(c->remote)); + + nxt_conn_connect_error(task, c, data); +} + + +void +nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_work_handler_t handler; + const nxt_conn_state_t *state; + + c = obj; + + state = c->write_state; + + switch (c->socket.error) { + + case NXT_ECONNREFUSED: +#if (NXT_LINUX) + case NXT_EAGAIN: + /* + * Linux returns EAGAIN instead of ECONNREFUSED + * for UNIX sockets if a listen queue is full. + */ +#endif + handler = state->close_handler; + break; + + default: + handler = state->error_handler; + break; + } + + nxt_work_queue_add(c->write_work_queue, handler, task, c, data); +} diff --git a/src/nxt_conn_proxy.c b/src/nxt_conn_proxy.c new file mode 100644 index 00000000..dec23684 --- /dev/null +++ b/src/nxt_conn_proxy.c @@ -0,0 +1,998 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include + + +static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink); +static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b); +static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *sink, nxt_conn_t *source); +static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b); +static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, + void *data); +static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data); +static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink); +static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p); +static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data); + + +static const nxt_conn_state_t nxt_conn_proxy_client_wait_state; +static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state; +static const nxt_conn_state_t nxt_conn_proxy_client_read_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_read_state; +static const nxt_conn_state_t nxt_conn_proxy_client_write_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_write_state; + + +nxt_conn_proxy_t * +nxt_conn_proxy_create(nxt_conn_t *client) +{ + nxt_conn_t *peer; + nxt_thread_t *thr; + nxt_conn_proxy_t *p; + + p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_conn_proxy_t)); + if (nxt_slow_path(p == NULL)) { + return NULL; + } + + peer = nxt_conn_create(client->mem_pool, client->socket.task); + if (nxt_slow_path(peer == NULL)) { + return NULL; + } + + thr = nxt_thread(); + + client->read_work_queue = &thr->engine->read_work_queue; + client->write_work_queue = &thr->engine->write_work_queue; + client->socket.read_work_queue = &thr->engine->read_work_queue; + client->socket.write_work_queue = &thr->engine->write_work_queue; + peer->socket.read_work_queue = &thr->engine->read_work_queue; + peer->socket.write_work_queue = &thr->engine->write_work_queue; + + peer->socket.data = client->socket.data; + + peer->read_work_queue = client->read_work_queue; + peer->write_work_queue = client->write_work_queue; + peer->read_timer.work_queue = client->read_work_queue; + peer->write_timer.work_queue = client->write_work_queue; + + p->client = client; + p->peer = peer; + + return p; +} + + +void +nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p) +{ + nxt_conn_t *peer; + + /* + * Peer read event: not connected, disabled. + * Peer write event: not connected, disabled. + */ + + if (p->client_wait_timeout == 0) { + /* + * Peer write event: waiting for connection + * to be established with connect_timeout. + */ + peer = p->peer; + peer->write_state = &nxt_conn_proxy_peer_connect_state; + + nxt_conn_connect(task->thread->engine, peer); + } + + /* + * Client read event: waiting for client data with + * client_wait_timeout before buffer allocation. + */ + p->client->read_state = &nxt_conn_proxy_client_wait_state; + + nxt_conn_wait(p->client); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_wait_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_buffer_alloc, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), +}; + + +static void +nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd); + + b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, + NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); + + if (nxt_slow_path(b == NULL)) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->client_buffer = b; + client->read = b; + + if (p->peer->socket.fd != -1) { + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: disabled. + * Peer write event: waiting for connection to be established + * or blocked after the connection has established. + */ + client->read_state = &nxt_conn_proxy_client_read_state; + + } else { + /* + * Client read event: waiting for data with client_wait_timeout + * before connecting to a peer. + * Client write event: blocked. + * Peer read event: not connected, disabled. + * Peer write event: not connected, disabled. + */ + client->read_state = &nxt_conn_proxy_client_first_read_state; + } + + nxt_conn_read(task->thread->engine, client); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_connect, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: disabled. + * Peer write event: waiting for connection to be established + * with connect_timeout. + */ + client->read_state = &nxt_conn_proxy_client_read_state; + + p->peer->write_state = &nxt_conn_proxy_peer_connect_state; + + nxt_conn_connect(task->thread->engine, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_connected, + .close_handler = nxt_conn_proxy_refused, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client, *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd); + + p->connected = 1; + + nxt_conn_tcp_nodelay_on(task, peer); + nxt_conn_tcp_nodelay_on(task, p->client); + + /* Peer read event: waiting with peer_wait_timeout. */ + + peer->read_state = &nxt_conn_proxy_peer_wait_state; + peer->write_state = &nxt_conn_proxy_peer_write_state; + + nxt_conn_wait(peer); + + if (p->client_buffer != NULL) { + client = p->client; + + client->read_state = &nxt_conn_proxy_client_read_state; + client->write_state = &nxt_conn_proxy_client_write_state; + /* + * Send a client read data to the connected peer. + * Client write event: blocked. + */ + nxt_conn_proxy_read_process(task, p, client, peer); + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_read, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout), +}; + + +static void +nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd); + + b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, + NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); + + if (nxt_slow_path(b == NULL)) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->peer_buffer = b; + peer->read = b; + + p->client->write_state = &nxt_conn_proxy_client_write_state; + peer->read_state = &nxt_conn_proxy_peer_read_state; + peer->write_state = &nxt_conn_proxy_peer_write_state; + + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: waiting with possible peer_wait_timeout. + * Peer write event: blocked. + */ + nxt_conn_read(task->thread->engine, peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_read_ready, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_read_error, +}; + + +static void +nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd); + + nxt_conn_proxy_read_process(task, p, client, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_read_ready, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_read_error, +}; + + +static void +nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd); + + nxt_conn_proxy_read_process(task, p, peer, p->client); +} + + +static void +nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink) +{ + nxt_buf_t *rb, *wb; + + if (sink->socket.error != 0) { + nxt_debug(task, "conn proxy sink fd:%d error:%d", + sink->socket.fd, sink->socket.error); + + nxt_conn_proxy_write_error(task, sink, sink->socket.data); + return; + } + + while (source->read != NULL) { + + rb = source->read; + + if (rb->mem.pos != rb->mem.free) { + + /* Add a read part to a write chain. */ + + wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); + if (wb == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + wb->mem.pos = rb->mem.pos; + wb->mem.free = rb->mem.free; + wb->mem.start = rb->mem.pos; + wb->mem.end = rb->mem.free; + + rb->mem.pos = rb->mem.free; + rb->mem.start = rb->mem.free; + + nxt_conn_proxy_write_add(sink, wb); + } + + if (rb->mem.start != rb->mem.end) { + nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, + task, source, source->socket.data); + break; + } + + source->read = rb->next; + nxt_buf_free(source->mem_pool, rb); + } + + if (p->connected) { + nxt_conn_write(task->thread->engine, sink); + } +} + + +static void +nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b) +{ + nxt_buf_t *first, *second, *prev; + + first = c->write; + + if (first == NULL) { + c->write = b; + return; + } + + /* + * A event conn proxy maintains a buffer per each direction. + * The buffer is divided by read and write parts. These parts are + * linked in buffer chains. There can be no more than two buffers + * in write chain at any time, because an added buffer is coalesced + * with the last buffer if possible. + */ + + second = first->next; + + if (second == NULL) { + + if (first->mem.end != b->mem.start) { + first->next = b; + return; + } + + /* + * The first buffer is just before the added buffer, so + * expand the first buffer to the end of the added buffer. + */ + prev = first; + + } else { + if (second->mem.end != b->mem.start) { + nxt_thread_log_alert("event conn proxy write: second buffer end:%p " + "is not equal to added buffer start:%p", + second->mem.end, b->mem.start); + return; + } + + /* + * "second->mem.end == b->mem.start" must be always true here, + * that is the second buffer is just before the added buffer, + * so expand the second buffer to the end of added buffer. + */ + prev = second; + } + + prev->mem.free = b->mem.end; + prev->mem.end = b->mem.end; + + nxt_buf_free(c->mem_pool, b); +} + + +static void +nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + source = obj; + p = data; + + nxt_debug(task, "conn proxy read fd:%d", source->socket.fd); + + if (!source->socket.closed) { + sink = (source == p->client) ? p->peer : p->client; + + if (sink->socket.error == 0) { + nxt_conn_read(task->thread->engine, source); + } + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_write_ready, + .error_handler = nxt_conn_proxy_write_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd); + + nxt_conn_proxy_write_process(task, p, client, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_write_ready, + .error_handler = nxt_conn_proxy_write_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd); + + nxt_conn_proxy_write_process(task, p, peer, p->client); +} + + +static void +nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *sink, nxt_conn_t *source) +{ + nxt_buf_t *rb, *wb; + + while (sink->write != NULL) { + + wb = sink->write; + + if (nxt_buf_is_sync(wb)) { + + /* A sync buffer marks the end of stream. */ + + sink->write = NULL; + nxt_buf_free(sink->mem_pool, wb); + nxt_conn_proxy_shutdown(task, p, source, sink); + return; + } + + if (wb->mem.start != wb->mem.pos) { + + /* Add a written part to a read chain. */ + + rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); + if (rb == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + rb->mem.pos = wb->mem.start; + rb->mem.free = wb->mem.start; + rb->mem.start = wb->mem.start; + rb->mem.end = wb->mem.pos; + + wb->mem.start = wb->mem.pos; + + nxt_conn_proxy_read_add(source, rb); + } + + if (wb->mem.pos != wb->mem.free) { + nxt_conn_write(task->thread->engine, sink); + + break; + } + + sink->write = wb->next; + nxt_buf_free(sink->mem_pool, wb); + } + + nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, + task, source, source->socket.data); +} + + +static void +nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b) +{ + nxt_buf_t *first, *second; + + first = c->read; + + if (first == NULL) { + c->read = b; + return; + } + + /* + * A event conn proxy maintains a buffer per each direction. + * The buffer is divided by read and write parts. These parts are + * linked in buffer chains. There can be no more than two buffers + * in read chain at any time, because an added buffer is coalesced + * with the last buffer if possible. The first and the second + * buffers are also coalesced if possible. + */ + + second = first->next; + + if (second == NULL) { + + if (first->mem.start == b->mem.end) { + /* + * The added buffer is just before the first buffer, so expand + * the first buffer to the beginning of the added buffer. + */ + first->mem.pos = b->mem.start; + first->mem.free = b->mem.start; + first->mem.start = b->mem.start; + + } else if (first->mem.end == b->mem.start) { + /* + * The added buffer is just after the first buffer, so + * expand the first buffer to the end of the added buffer. + */ + first->mem.end = b->mem.end; + + } else { + first->next = b; + return; + } + + } else { + if (second->mem.end != b->mem.start) { + nxt_thread_log_alert("event conn proxy read: second buffer end:%p " + "is not equal to added buffer start:%p", + second->mem.end, b->mem.start); + return; + } + + /* + * The added buffer is just after the second buffer, so + * expand the second buffer to the end of the added buffer. + */ + second->mem.end = b->mem.end; + + if (first->mem.start == second->mem.end) { + /* + * The second buffer is just before the first buffer, so expand + * the first buffer to the beginning of the second buffer. + */ + first->mem.pos = second->mem.start; + first->mem.free = second->mem.start; + first->mem.start = second->mem.start; + first->next = NULL; + + nxt_buf_free(c->mem_pool, second); + } + } + + nxt_buf_free(c->mem_pool, b); +} + + +static void +nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + source = obj; + p = data; + + nxt_debug(task, "conn proxy close fd:%d", source->socket.fd); + + sink = (source == p->client) ? p->peer : p->client; + + if (sink->write == NULL) { + nxt_conn_proxy_shutdown(task, p, source, sink); + return; + } + + b = nxt_buf_sync_alloc(source->mem_pool, 0); + if (b == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + nxt_buf_chain_add(&sink->write, b); +} + + +static void +nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_conn_proxy_t *p; + + c = obj; + p = data; + + nxt_debug(task, "conn proxy error fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, p); +} + + +static void +nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + c = nxt_read_timer_conn(timer); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, c->socket.data); +} + + +static void +nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + c = nxt_write_timer_conn(timer); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, c->socket.data); +} + + +static nxt_msec_t +nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_msec_t *timer; + nxt_conn_proxy_t *p; + + p = c->socket.data; + + timer = (nxt_msec_t *) ((char *) p + data); + + return *timer; +} + + +static void +nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd); + + if (p->retries == 0) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->retries--; + + nxt_socket_close(task, peer->socket.fd); + peer->socket.fd = -1; + peer->socket.error = 0; + + p->delayed = 1; + + peer->write_timer.handler = nxt_conn_proxy_reconnect_handler; + nxt_timer_add(task->thread->engine, &peer->write_timer, + p->reconnect_timeout); +} + + +static void +nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_timer_t *timer; + nxt_conn_proxy_t *p; + + timer = obj; + + nxt_debug(task, "conn proxy reconnect timer"); + + peer = nxt_write_timer_conn(timer); + p = peer->socket.data; + + if (p->client->socket.closed) { + nxt_conn_proxy_complete(task, p); + return; + } + + p->delayed = 0; + + peer->write_state = &nxt_conn_proxy_peer_connect_state; + /* + * Peer read event: disabled. + * Peer write event: waiting for connection with connect_timeout. + */ + nxt_conn_connect(task->thread->engine, peer); +} + + +static void +nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink) +{ + nxt_buf_t *b; + + nxt_debug(source->socket.task, + "conn proxy shutdown source fd:%d cl:%d err:%d", + source->socket.fd, source->socket.closed, source->socket.error); + + nxt_debug(sink->socket.task, + "conn proxy shutdown sink fd:%d cl:%d err:%d", + sink->socket.fd, sink->socket.closed, sink->socket.error); + + if (!p->connected || p->delayed) { + nxt_conn_proxy_complete(task, p); + return; + } + + if (sink->socket.error == 0 && !sink->socket.closed) { + sink->socket.shutdown = 1; + nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); + } + + if (sink->socket.error != 0 + || (sink->socket.closed && source->write == NULL)) + { + /* The opposite direction also has been already closed. */ + nxt_conn_proxy_complete(task, p); + return; + } + + nxt_debug(source->socket.task, "free source buffer"); + + /* Free the direction's buffer. */ + b = (source == p->client) ? p->client_buffer : p->peer_buffer; + nxt_mem_free(source->mem_pool, b); +} + + +static void +nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_conn_proxy_t *p; + + c = obj; + p = data; + + nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, p); +} + + +static void +nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + sink = obj; + p = data; + + nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd); + + /* Clear data for the direction sink. */ + sink->write = NULL; + + /* Block the direction source. */ + source = (sink == p->client) ? p->peer : p->client; + nxt_fd_event_block_read(task->thread->engine, &source->socket); + + if (source->write == NULL) { + /* + * There is no data for the opposite direction and + * the next read from the sink will most probably fail. + */ + nxt_conn_proxy_complete(task, p); + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_completion, +}; + + +static void +nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p) +{ + nxt_event_engine_t *engine; + + engine = task->thread->engine; + + nxt_debug(p->client->socket.task, "conn proxy complete %d:%d", + p->client->socket.fd, p->peer->socket.fd); + + if (p->delayed) { + p->delayed = 0; + nxt_queue_remove(&p->peer->link); + } + + if (p->client->socket.fd != -1) { + p->retain = 1; + p->client->write_state = &nxt_conn_proxy_close_state; + nxt_conn_close(engine, p->client); + } + + if (p->peer->socket.fd != -1) { + p->retain++; + p->peer->write_state = &nxt_conn_proxy_close_state; + nxt_conn_close(engine, p->peer); + } +} + + +static void +nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_proxy_t *p; + + p = data; + + nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d", + p->retain, p->client->socket.fd, p->peer->socket.fd); + + p->retain--; + + if (p->retain == 0) { + nxt_mem_free(p->client->mem_pool, p->client_buffer); + nxt_mem_free(p->client->mem_pool, p->peer_buffer); + + p->completion_handler(task, p, NULL); + } +} diff --git a/src/nxt_conn_read.c b/src/nxt_conn_read.c new file mode 100644 index 00000000..e7134b99 --- /dev/null +++ b/src/nxt_conn_read.c @@ -0,0 +1,241 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include + + +void +nxt_conn_wait(nxt_conn_t *c) +{ + nxt_event_engine_t *engine; + const nxt_conn_state_t *state; + + nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", + c->socket.fd, c->socket.read_ready); + + engine = c->socket.task->thread->engine; + state = c->read_state; + + if (c->socket.read_ready) { + nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, + c->socket.task, c, c->socket.data); + return; + } + + c->socket.read_handler = state->ready_handler; + c->socket.error_handler = state->error_handler; + + nxt_conn_timer(engine, c, state, &c->read_timer); + + nxt_fd_event_enable_read(engine, &c->socket); +} + + +void +nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) +{ + ssize_t n; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_work_queue_t *wq; + nxt_event_engine_t *engine; + nxt_work_handler_t handler; + const nxt_conn_state_t *state; + + c = obj; + + nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", + c->socket.fd, c->socket.read_ready, c->socket.closed); + + engine = task->thread->engine; + + state = c->read_state; + + if (c->socket.read_ready) { + + b = c->read; + + if (c->peek == 0) { + n = c->io->recvbuf(c, b); + + } else { + n = c->io->recv(c, b->mem.free, c->peek, MSG_PEEK); + } + + if (n > 0) { + c->nbytes = n; + + nxt_recvbuf_update(b, n); + + nxt_fd_event_block_read(engine, &c->socket); + + if (state->timer_autoreset) { + nxt_timer_disable(engine, &c->read_timer); + } + + wq = c->read_work_queue; + handler = state->ready_handler; + + nxt_work_queue_add(wq, handler, task, c, data); + + return; + } + + if (n != NXT_AGAIN) { + nxt_fd_event_block_read(engine, &c->socket); + nxt_timer_disable(engine, &c->read_timer); + + wq = &engine->fast_work_queue; + + handler = (n == 0) ? state->close_handler : state->error_handler; + + nxt_work_queue_add(wq, handler, task, c, data); + + return; + } + } + + /* + * Here c->io->read() is assigned instead of direct nxt_conn_io_read() + * because the function can be called by nxt_kqueue_conn_io_read(). + */ + c->socket.read_handler = c->io->read; + c->socket.error_handler = state->error_handler; + + if (c->read_timer.state == NXT_TIMER_DISABLED + || nxt_fd_event_is_disabled(c->socket.read)) + { + /* Timer may be set or reset. */ + nxt_conn_timer(engine, c, state, &c->read_timer); + + if (nxt_fd_event_is_disabled(c->socket.read)) { + nxt_fd_event_enable_read(engine, &c->socket); + } + } + + return; +} + + +ssize_t +nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) +{ + ssize_t n; + nxt_err_t err; + nxt_uint_t niov; + struct iovec iov[NXT_IOBUF_MAX]; + nxt_recvbuf_coalesce_t rb; + + rb.buf = b; + rb.iobuf = iov; + rb.nmax = NXT_IOBUF_MAX; + rb.size = 0; + + niov = nxt_recvbuf_mem_coalesce(&rb); + + if (niov == 1) { + /* Disposal of surplus kernel iovec copy-in operation. */ + return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); + } + + for ( ;; ) { + n = readv(c->socket.fd, iov, niov); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); + + if (n > 0) { + if ((size_t) n < rb.size) { + c->socket.read_ready = 0; + } + + return n; + } + + if (n == 0) { + c->socket.closed = 1; + c->socket.read_ready = 0; + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + nxt_debug(c->socket.task, "readv() %E", err); + c->socket.read_ready = 0; + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(c->socket.task, "readv() %E", err); + continue; + + default: + c->socket.error = err; + nxt_log(c->socket.task, nxt_socket_error_level(err), + "readv(%d, %ui) failed %E", c->socket.fd, niov, err); + + return NXT_ERROR; + } + } +} + + +ssize_t +nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) +{ + ssize_t n; + nxt_err_t err; + + for ( ;; ) { + n = recv(c->socket.fd, buf, size, flags); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", + c->socket.fd, buf, size, flags, n); + + if (n > 0) { + if ((size_t) n < size) { + c->socket.read_ready = 0; + } + + return n; + } + + if (n == 0) { + c->socket.closed = 1; + c->socket.read_ready = 0; + + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + nxt_debug(c->socket.task, "recv() %E", err); + c->socket.read_ready = 0; + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(c->socket.task, "recv() %E", err); + continue; + + default: + c->socket.error = err; + nxt_log(c->socket.task, nxt_socket_error_level(err), + "recv(%d, %p, %uz, %ui) failed %E", + c->socket.fd, buf, size, flags, err); + + return NXT_ERROR; + } + } +} diff --git a/src/nxt_conn_write.c b/src/nxt_conn_write.c new file mode 100644 index 00000000..a2a5737b --- /dev/null +++ b/src/nxt_conn_write.c @@ -0,0 +1,422 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include + + +static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, + void *data); + + +void +nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) +{ + ssize_t ret; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_sendbuf_t sb; + nxt_event_engine_t *engine; + + c = obj; + + nxt_debug(task, "conn write fd:%d", c->socket.fd); + + if (!c->socket.write_ready || c->write == NULL) { + return; + } + + engine = task->thread->engine; + + c->socket.write_handler = nxt_conn_io_write; + c->socket.error_handler = c->write_state->error_handler; + + b = c->write; + + sb.socket = c->socket.fd; + sb.error = 0; + sb.sent = 0; + sb.size = 0; + sb.buf = b; + sb.limit = 10 * 1024 * 1024; + sb.ready = 1; + sb.sync = 0; + + do { + ret = nxt_conn_io_sendbuf(task, &sb); + + c->socket.write_ready = sb.ready; + c->socket.error = sb.error; + + if (ret < 0) { + /* ret == NXT_AGAIN || ret == NXT_ERROR. */ + break; + } + + sb.sent += ret; + sb.limit -= ret; + + b = nxt_sendbuf_update(b, ret); + + if (b == NULL) { + nxt_fd_event_block_write(engine, &c->socket); + break; + } + + sb.buf = b; + + if (!c->socket.write_ready) { + ret = NXT_AGAIN; + break; + } + + } while (sb.limit != 0); + + nxt_debug(task, "event conn: %i sent:%z", ret, sb.sent); + + if (sb.sent != 0) { + if (c->write_state->timer_autoreset) { + nxt_timer_disable(engine, &c->write_timer); + } + } + + if (ret != NXT_ERROR) { + + if (sb.limit == 0) { + /* + * Postpone writing until next event poll to allow to + * process other recevied events and to get new events. + */ + c->write_timer.handler = nxt_conn_write_timer_handler; + nxt_timer_add(engine, &c->write_timer, 0); + + } else if (ret == NXT_AGAIN) { + /* + * SSL libraries can require to toggle either write or read + * event if renegotiation occurs during SSL write operation. + * This case is handled on the event_io->send() level. Timer + * can be set here because it should be set only for write + * direction. + */ + nxt_conn_timer(engine, c, c->write_state, &c->write_timer); + + if (nxt_fd_event_is_disabled(c->socket.write)) { + nxt_fd_event_enable_write(engine, &c->socket); + } + } + } + + if (ret == 0 || sb.sent != 0) { + /* "ret == 0" means a sync buffer was processed. */ + c->sent += sb.sent; + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, data); + /* + * Fall through if first operations were + * successful but the last one failed. + */ + } + + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_fd_event_block_write(engine, &c->socket); + + nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, + task, c, data); + } +} + + +static void +nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + nxt_debug(task, "event conn conn timer"); + + c = nxt_write_timer_conn(timer); + c->delayed = 0; + + c->io->write(task, c, c->socket.data); +} + + +ssize_t +nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) +{ + nxt_uint_t niov; + struct iovec iov[NXT_IOBUF_MAX]; + + niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX); + + if (niov == 0 && sb->sync) { + return 0; + } + + return nxt_conn_io_writev(task, sb, iov, niov); +} + + +ssize_t +nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, + nxt_uint_t niov) +{ + ssize_t n; + nxt_err_t err; + + if (niov == 1) { + /* Disposal of surplus kernel iovec copy-in operation. */ + return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len); + } + + for ( ;; ) { + n = writev(sb->socket, iov, niov); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "writev() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "writev() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "writev(%d, %ui) failed %E", sb->socket, niov, err); + + return NXT_ERROR; + } + } +} + + +ssize_t +nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size) +{ + ssize_t n; + nxt_err_t err; + + for ( ;; ) { + n = send(sb->socket, buf, size, 0); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "send() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "send() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err); + + return NXT_ERROR; + } + } +} + + +/* Obsolete interfaces. */ + +size_t +nxt_event_conn_write_limit(nxt_conn_t *c) +{ + ssize_t limit, correction; + nxt_event_write_rate_t *rate; + + rate = c->rate; + + if (rate == NULL) { + return c->max_chunk; + } + + limit = rate->limit; + correction = limit - (size_t) rate->average; + + nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f", + correction, rate->average); + + limit += correction; + + if (limit <= 0) { + return 0; + } + + if (rate->limit_after != 0) { + limit += rate->limit_after; + limit = nxt_min((size_t) limit, rate->max_limit); + } + + return nxt_min((size_t) limit, c->max_chunk); +} + + +nxt_bool_t +nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c, + size_t sent) +{ + return 0; +} + + +ssize_t +nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit) +{ + ssize_t ret; + + ret = c->io->sendbuf(c, b, limit); + + if ((ret == NXT_AGAIN || !c->socket.write_ready) + && nxt_fd_event_is_disabled(c->socket.write)) + { + nxt_fd_event_enable_write(c->socket.task->thread->engine, &c->socket); + } + + return ret; +} + + +ssize_t +nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit) +{ + nxt_uint_t niob; + struct iovec iob[NXT_IOBUF_MAX]; + nxt_sendbuf_coalesce_t sb; + + sb.buf = b; + sb.iobuf = iob; + sb.nmax = NXT_IOBUF_MAX; + sb.sync = 0; + sb.size = 0; + sb.limit = limit; + + niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); + + if (niob == 0 && sb.sync) { + return 0; + } + + return nxt_event_conn_io_writev(c, iob, niob); +} + + +ssize_t +nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) +{ + ssize_t n; + nxt_err_t err; + + if (niob == 1) { + /* Disposal of surplus kernel iovec copy-in operation. */ + return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len); + } + + for ( ;; ) { + n = writev(c->socket.fd, iob, niob); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(c->socket.task, "writev(%d, %ui): %d", c->socket.fd, niob, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + nxt_debug(c->socket.task, "writev() %E", err); + c->socket.write_ready = 0; + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(c->socket.task, "writev() %E", err); + continue; + + default: + c->socket.error = err; + nxt_log(c->socket.task, nxt_socket_error_level(err), + "writev(%d, %ui) failed %E", c->socket.fd, niob, err); + return NXT_ERROR; + } + } +} + + +ssize_t +nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size) +{ + ssize_t n; + nxt_err_t err; + + for ( ;; ) { + n = send(c->socket.fd, buf, size, 0); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(c->socket.task, "send(%d, %p, %uz): %z", + c->socket.fd, buf, size, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + nxt_debug(c->socket.task, "send() %E", err); + c->socket.write_ready = 0; + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(c->socket.task, "send() %E", err); + continue; + + default: + c->socket.error = err; + nxt_log(c->socket.task, nxt_socket_error_level(err), + "send(%d, %p, %uz) failed %E", + c->socket.fd, buf, size, err); + return NXT_ERROR; + } + } +} diff --git a/src/nxt_controller.c b/src/nxt_controller.c index e31e2e15..6ec4c744 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -34,7 +34,7 @@ typedef struct { static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c, +static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data); @@ -54,8 +54,8 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, uintptr_t data, nxt_log_t *log); static void nxt_controller_process_request(nxt_task_t *task, - nxt_event_conn_t *c, nxt_controller_request_t *r); -static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c, + nxt_conn_t *c, nxt_controller_request_t *r); +static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_conn_t *c, nxt_controller_response_t *resp); static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp, nxt_mem_pool_t *pool); @@ -184,7 +184,7 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) */ ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls) + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_t) + 4 * sizeof(nxt_buf_t); if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { @@ -201,7 +201,7 @@ static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_controller_request_t *r; @@ -237,7 +237,7 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) c->read_work_queue = &engine->read_work_queue; c->write_work_queue = &engine->write_work_queue; - nxt_event_conn_read(engine, c); + nxt_conn_read(engine, c); } @@ -260,7 +260,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) size_t preread; nxt_buf_t *b; nxt_int_t rc; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_controller_request_t *r; c = obj; @@ -284,7 +284,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) return; } - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); return; } @@ -329,12 +329,12 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) c->read_state = &nxt_controller_conn_body_read_state; - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); } static nxt_msec_t -nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) +nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) { return (nxt_msec_t) data; } @@ -343,7 +343,7 @@ nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -356,12 +356,12 @@ nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *ev; - nxt_event_conn_t *c; + nxt_timer_t *timer; + nxt_conn_t *c; - ev = obj; + timer = obj; - c = nxt_event_read_timer_conn(ev); + c = nxt_read_timer_conn(timer); c->socket.timedout = 1; c->socket.closed = 1; @@ -388,9 +388,9 @@ static const nxt_event_conn_state_t nxt_controller_conn_body_read_state static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) { - size_t rest; - nxt_buf_t *b; - nxt_event_conn_t *c; + size_t rest; + nxt_buf_t *b; + nxt_conn_t *c; c = obj; @@ -409,7 +409,7 @@ nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "controller conn body read again, rest: %uz", rest); - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); } @@ -429,8 +429,8 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; - nxt_event_conn_t *c; + nxt_buf_t *b; + nxt_conn_t *c; c = obj; @@ -439,7 +439,7 @@ nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) b = c->write; if (b->mem.pos != b->mem.free) { - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); return; } @@ -452,7 +452,7 @@ nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -465,12 +465,12 @@ nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *ev; - nxt_event_conn_t *c; + nxt_conn_t *c; + nxt_timer_t *timer; - ev = obj; + timer = obj; - c = nxt_event_write_timer_conn(ev); + c = nxt_write_timer_conn(timer); c->socket.timedout = 1; c->socket.closed = 1; @@ -490,7 +490,7 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -500,14 +500,14 @@ nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) c->write_state = &nxt_controller_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -544,7 +544,7 @@ nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, static void -nxt_controller_process_request(nxt_task_t *task, nxt_event_conn_t *c, +nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c, nxt_controller_request_t *req) { nxt_int_t rc; @@ -737,7 +737,7 @@ done: static nxt_int_t -nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c, +nxt_controller_response(nxt_task_t *task, nxt_conn_t *c, nxt_controller_response_t *resp) { size_t size; @@ -765,7 +765,7 @@ nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c, c->write = b; c->write_state = &nxt_controller_conn_write_state; - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); return NXT_OK; } diff --git a/src/nxt_devpoll_engine.c b/src/nxt_devpoll_engine.c index 37435661..faa6368a 100644 --- a/src/nxt_devpoll_engine.c +++ b/src/nxt_devpoll_engine.c @@ -85,7 +85,7 @@ const nxt_event_interface_t nxt_devpoll_engine = { NULL, nxt_devpoll_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index 4ca878e6..410c542d 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -38,10 +38,9 @@ static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, - nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, - uint32_t mode); + nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, - nxt_event_conn_io_t *io); + nxt_conn_io_t *io); static void nxt_epoll_free(nxt_event_engine_t *engine); static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); @@ -83,28 +82,27 @@ static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); #if (NXT_HAVE_ACCEPT4) -static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, +static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data); #endif #if (NXT_HAVE_EPOLL_EDGE) -static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, +static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data); -static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, +static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, - nxt_buf_t *b); +static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); -static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { - nxt_epoll_edge_event_conn_io_connect, - nxt_event_conn_io_accept, +static nxt_conn_io_t nxt_epoll_edge_conn_io = { + nxt_epoll_edge_conn_io_connect, + nxt_conn_io_accept, - nxt_event_conn_io_read, - nxt_epoll_edge_event_conn_io_recvbuf, - nxt_event_conn_io_recv, + nxt_conn_io_read, + nxt_epoll_edge_conn_io_recvbuf, + nxt_conn_io_recv, nxt_conn_io_write, nxt_event_conn_io_write_chunk, @@ -118,7 +116,7 @@ static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { nxt_event_conn_io_writev, nxt_event_conn_io_send, - nxt_event_conn_io_shutdown, + nxt_conn_io_shutdown, }; @@ -150,7 +148,7 @@ const nxt_event_interface_t nxt_epoll_edge_engine = { #endif nxt_epoll_poll, - &nxt_epoll_edge_event_conn_io, + &nxt_epoll_edge_conn_io, #if (NXT_HAVE_INOTIFY) NXT_FILE_EVENTS, @@ -196,7 +194,7 @@ const nxt_event_interface_t nxt_epoll_level_engine = { #endif nxt_epoll_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, #if (NXT_HAVE_INOTIFY) NXT_FILE_EVENTS, @@ -218,8 +216,7 @@ static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { - return nxt_epoll_create(engine, mchanges, mevents, - &nxt_epoll_edge_event_conn_io, + return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, EPOLLET | EPOLLRDHUP); } @@ -231,13 +228,13 @@ nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { return nxt_epoll_create(engine, mchanges, mevents, - &nxt_unix_event_conn_io, 0); + &nxt_unix_conn_io, 0); } static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, - nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode) + nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) { engine->u.epoll.fd = -1; engine->u.epoll.mode = mode; @@ -290,7 +287,7 @@ fail: static void -nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) +nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) { static nxt_work_handler_t handler; @@ -303,7 +300,7 @@ nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); if (nxt_errno != NXT_ENOSYS) { - handler = nxt_epoll_event_conn_io_accept4; + handler = nxt_epoll_conn_io_accept4; } else { nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", @@ -985,19 +982,19 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) #if (NXT_HAVE_ACCEPT4) static void -nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) +nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) { - socklen_t len; - nxt_socket_t s; - struct sockaddr *sa; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; + socklen_t len; + nxt_conn_t *c; + nxt_socket_t s; + struct sockaddr *sa; + nxt_listen_event_t *lev; - cls = obj; - c = cls->next; + lev = obj; + c = lev->next; - cls->ready--; - cls->socket.read_ready = (cls->ready != 0); + lev->ready--; + lev->socket.read_ready = (lev->ready != 0); len = c->remote->socklen; @@ -1009,18 +1006,18 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) len = 0; } - s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK); + s = accept4(lev->socket.fd, sa, &len, SOCK_NONBLOCK); if (s != -1) { c->socket.fd = s; - nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s); + nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); - nxt_event_conn_accept(task, cls, c); + nxt_conn_accept(task, lev, c); return; } - nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno); + nxt_conn_accept_error(task, lev, "accept4", nxt_errno); } #endif @@ -1039,9 +1036,9 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) */ static void -nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) +nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_event_conn_state_t *state; @@ -1058,11 +1055,11 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; case NXT_AGAIN: - c->socket.write_handler = nxt_epoll_edge_event_conn_connected; - c->socket.error_handler = nxt_event_conn_connect_error; + c->socket.write_handler = nxt_epoll_edge_conn_connected; + c->socket.error_handler = nxt_conn_connect_error; engine = task->thread->engine; - nxt_event_conn_timer(engine, c, state, &c->write_timer); + nxt_conn_timer(engine, c, state, &c->write_timer); nxt_epoll_enable(engine, &c->socket); c->socket.read = NXT_EVENT_BLOCKED; @@ -1070,7 +1067,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) #if 0 case NXT_AGAIN: - nxt_event_conn_timer(engine, c, state, &c->write_timer); + nxt_conn_timer(engine, c, state, &c->write_timer); /* Fall through. */ @@ -1111,9 +1108,9 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) static void -nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) +nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -1131,22 +1128,22 @@ nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) return; } - nxt_event_conn_connect_test(task, c, data); + nxt_conn_connect_test(task, c, data); } /* - * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around - * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF + * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around + * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF * in edge-triggered mode. */ static ssize_t -nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) { ssize_t n; - n = nxt_event_conn_io_recvbuf(c, b); + n = nxt_conn_io_recvbuf(c, b); if (n > 0 && c->socket.epoll_eof) { c->socket.read_ready = 1; diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c deleted file mode 100644 index b8adb00a..00000000 --- a/src/nxt_event_conn.c +++ /dev/null @@ -1,152 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include - - -nxt_event_conn_io_t nxt_unix_event_conn_io = { - nxt_event_conn_io_connect, - nxt_event_conn_io_accept, - - nxt_event_conn_io_read, - nxt_event_conn_io_recvbuf, - nxt_event_conn_io_recv, - - nxt_conn_io_write, - nxt_event_conn_io_write_chunk, - -#if (NXT_HAVE_LINUX_SENDFILE) - nxt_linux_event_conn_io_sendfile, -#elif (NXT_HAVE_FREEBSD_SENDFILE) - nxt_freebsd_event_conn_io_sendfile, -#elif (NXT_HAVE_MACOSX_SENDFILE) - nxt_macosx_event_conn_io_sendfile, -#elif (NXT_HAVE_SOLARIS_SENDFILEV) - nxt_solaris_event_conn_io_sendfilev, -#elif (NXT_HAVE_AIX_SEND_FILE) - nxt_aix_event_conn_io_send_file, -#elif (NXT_HAVE_HPUX_SENDFILE) - nxt_hpux_event_conn_io_sendfile, -#else - nxt_event_conn_io_sendbuf, -#endif - - nxt_event_conn_io_writev, - nxt_event_conn_io_send, - - nxt_event_conn_io_shutdown, -}; - - -nxt_event_conn_t * -nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task) -{ - nxt_thread_t *thr; - nxt_event_conn_t *c; - - c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t)); - if (nxt_slow_path(c == NULL)) { - return NULL; - } - - c->mem_pool = mp; - - c->socket.fd = -1; - - c->socket.log = &c->log; - c->log = *task->log; - - /* The while loop skips possible uint32_t overflow. */ - - while (c->log.ident == 0) { - c->log.ident = nxt_task_next_ident(); - } - - thr = nxt_thread(); - thr->engine->connections++; - - c->task.thread = thr; - c->task.log = &c->log; - c->task.ident = c->log.ident; - c->socket.task = &c->task; - c->read_timer.task = &c->task; - c->write_timer.task = &c->task; - - c->io = thr->engine->event.io; - c->max_chunk = NXT_INT32_T_MAX; - c->sendfile = NXT_CONN_SENDFILE_UNSET; - - c->socket.read_work_queue = &thr->engine->fast_work_queue; - c->socket.write_work_queue = &thr->engine->fast_work_queue; - - nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); - nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); - - nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections); - - return c; -} - - -void -nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) -{ - int ret; - socklen_t len; - struct linger linger; - nxt_event_conn_t *c; - - c = obj; - - nxt_debug(task, "event conn shutdown"); - - if (c->socket.timedout) { - /* - * Resetting of timed out connection on close - * releases kernel memory associated with socket. - * This also causes sending TCP/IP RST to a peer. - */ - linger.l_onoff = 1; - linger.l_linger = 0; - len = sizeof(struct linger); - - ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); - - if (nxt_slow_path(ret != 0)) { - nxt_log(task, NXT_LOG_CRIT, "setsockopt(%d, SO_LINGER) failed %E", - c->socket.fd, nxt_socket_errno); - } - } - - c->write_state->close_handler(task, c, data); -} - - -void -nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c, - const nxt_event_conn_state_t *state, nxt_timer_t *tev) -{ - nxt_msec_t timer; - - if (state->timer_value != NULL) { - timer = state->timer_value(c, state->timer_data); - - if (timer != 0) { - tev->handler = state->timer_handler; - nxt_timer_add(engine, tev, timer); - } - } -} - - -void -nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq) -{ - c->read_work_queue = wq; - c->write_work_queue = wq; - c->read_timer.work_queue = wq; - c->write_timer.work_queue = wq; -} diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h deleted file mode 100644 index 02c088b8..00000000 --- a/src/nxt_event_conn.h +++ /dev/null @@ -1,359 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_EVENT_CONN_H_INCLUDED_ -#define _NXT_EVENT_CONN_H_INCLUDED_ - - -typedef nxt_msec_t (*nxt_event_conn_timer_val_t)(nxt_event_conn_t *c, - uintptr_t data); - - -typedef struct { - nxt_work_handler_t ready_handler; - nxt_work_handler_t close_handler; - nxt_work_handler_t error_handler; - - nxt_work_handler_t timer_handler; - nxt_event_conn_timer_val_t timer_value; - uintptr_t timer_data; - - uint8_t timer_autoreset; -} nxt_event_conn_state_t; - - -typedef struct { - double average; - size_t limit; - size_t limit_after; - size_t max_limit; - nxt_msec_t last; -} nxt_event_write_rate_t; - - -typedef struct { - - nxt_work_handler_t connect; - nxt_work_handler_t accept; - - /* - * The read() with NULL c->read buffer waits readiness of a connection - * to avoid allocation of read buffer if the connection will time out - * or will be closed with error. The kqueue-specific read() can also - * detect case if a client did not sent anything and has just closed the - * connection without errors. In the latter case state's close_handler - * is called. - */ - nxt_work_handler_t read; - - ssize_t (*recvbuf)(nxt_event_conn_t *c, nxt_buf_t *b); - - ssize_t (*recv)(nxt_event_conn_t *c, void *buf, - size_t size, nxt_uint_t flags); - - /* - * The write() is an interface to write a buffer chain with a given rate - * limit. It calls write_chunk() in a loop and handles write event timer. - */ - nxt_work_handler_t write; - - /* - * The write_chunk() interface writes a buffer chain with a given limit - * and toggles write event. SSL/TLS libraries' write_chunk() interface - * buffers data and calls the library specific send() interface to write - * the buffered data eventually. - */ - ssize_t (*write_chunk)(nxt_event_conn_t *c, - nxt_buf_t *b, size_t limit); - - /* - * The sendbuf() is an interface for OS-specific sendfile - * implementations or simple writev(). - */ - ssize_t (*sendbuf)(nxt_event_conn_t *c, nxt_buf_t *b, - size_t limit); - /* - * The writev() is an interface to write several nxt_iobuf_t buffers. - */ - ssize_t (*writev)(nxt_event_conn_t *c, - nxt_iobuf_t *iob, nxt_uint_t niob); - /* - * The send() is an interface to write a single buffer. SSL/TLS - * libraries' send() interface handles also the libraries' errors. - */ - ssize_t (*send)(nxt_event_conn_t *c, void *buf, - size_t size); - - nxt_work_handler_t shutdown; -} nxt_event_conn_io_t; - - -/* - * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t - * because nxt_listen_socket_t is one per process whilst each worker - * thread uses own nxt_event_conn_listen_t. - */ -typedef struct { - /* Must be the first field. */ - nxt_fd_event_t socket; - - nxt_task_t task; - - uint32_t ready; - uint32_t batch; - - /* An accept() interface is cached to minimize memory accesses. */ - nxt_work_handler_t accept; - - nxt_listen_socket_t *listen; - nxt_event_conn_t *next; /* STUB */; - nxt_work_queue_t *work_queue; - - nxt_timer_t timer; - - nxt_queue_link_t link; -} nxt_event_conn_listen_t; - -typedef nxt_event_conn_listen_t nxt_listen_event_t; - - -struct nxt_event_conn_s { - /* - * Must be the first field, since nxt_fd_event_t - * and nxt_event_conn_t are used interchangeably. - */ - nxt_fd_event_t socket; - - nxt_buf_t *read; - const nxt_event_conn_state_t *read_state; - nxt_work_queue_t *read_work_queue; - nxt_timer_t read_timer; - - nxt_buf_t *write; - const nxt_event_conn_state_t *write_state; - nxt_work_queue_t *write_work_queue; - nxt_event_write_rate_t *rate; - nxt_timer_t write_timer; - - nxt_off_t sent; - uint32_t max_chunk; - uint32_t nbytes; - - nxt_event_conn_io_t *io; - -#if (NXT_SSLTLS || NXT_THREADS) - /* SunC does not support "zero-sized struct/union". */ - - union { -#if (NXT_SSLTLS) - void *ssltls; -#endif -#if (NXT_THREADS) - nxt_thread_pool_t *thread_pool; -#endif - } u; - -#endif - - nxt_mem_pool_t *mem_pool; - - nxt_task_t task; - nxt_log_t log; - - nxt_event_conn_listen_t *listen; - nxt_sockaddr_t *remote; - nxt_sockaddr_t *local; - const char *action; - - uint8_t peek; - uint8_t blocked; /* 1 bit */ - uint8_t delayed; /* 1 bit */ - -#define NXT_CONN_SENDFILE_OFF 0 -#define NXT_CONN_SENDFILE_ON 1 -#define NXT_CONN_SENDFILE_UNSET 3 - - uint8_t sendfile; /* 2 bits */ - uint8_t tcp_nodelay; /* 1 bit */ - - nxt_queue_link_t link; -}; - - -#define \ -nxt_event_conn_timer_init(ev, c, wq) \ - do { \ - (ev)->work_queue = (wq); \ - (ev)->log = &(c)->log; \ - (ev)->precision = NXT_TIMER_DEFAULT_PRECISION; \ - } while (0) - - -#define \ -nxt_event_read_timer_conn(ev) \ - nxt_timer_data(ev, nxt_event_conn_t, read_timer) - - -#define \ -nxt_event_write_timer_conn(ev) \ - nxt_timer_data(ev, nxt_event_conn_t, write_timer) - - -#if (NXT_HAVE_UNIX_DOMAIN) - -#define \ -nxt_event_conn_tcp_nodelay_on(task, c) \ - do { \ - nxt_int_t ret; \ - \ - if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \ - ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ - TCP_NODELAY, 1); \ - \ - (c)->tcp_nodelay = (ret == NXT_OK); \ - } \ - } while (0) - - -#else - -#define \ -nxt_event_conn_tcp_nodelay_on(task, c) \ - do { \ - nxt_int_t ret; \ - \ - ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ - TCP_NODELAY, 1); \ - \ - (c)->tcp_nodelay = (ret == NXT_OK); \ - } while (0) - -#endif - - -NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp, - nxt_task_t *task); -void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); -NXT_EXPORT void nxt_event_conn_close(nxt_event_engine_t *engine, - nxt_event_conn_t *c); - -NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine, - nxt_event_conn_t *c, const nxt_event_conn_state_t *state, nxt_timer_t *tev); -NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, - nxt_work_queue_t *wq); - -void nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data); -void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); -nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c); -void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data); -void nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data); - -NXT_EXPORT nxt_event_conn_listen_t *nxt_listen_event(nxt_task_t *task, - nxt_listen_socket_t *ls); -NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_task_t *task, - nxt_listen_socket_t *ls); -void nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data); -NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task, - nxt_event_conn_listen_t *cls, nxt_event_conn_t *c); -void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, - const char *accept_syscall, nxt_err_t err); - -void nxt_conn_wait(nxt_event_conn_t *c); - -void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data); -ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); -ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, - size_t size, nxt_uint_t flags); - -void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data); -ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb); -ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, - nxt_iobuf_t *iob, nxt_uint_t niob); -ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, - size_t size); - -size_t nxt_event_conn_write_limit(nxt_event_conn_t *c); -nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, - nxt_event_conn_t *c, size_t sent); -ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, - size_t limit); -ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, - nxt_uint_t niob); -ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size); - -NXT_EXPORT void nxt_event_conn_io_close(nxt_task_t *task, void *obj, - void *data); - -NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, - nxt_event_conn_t *c); - - -#define nxt_event_conn_connect(engine, c) \ - nxt_work_queue_add(&engine->socket_work_queue, nxt_event_conn_sys_socket, \ - c->socket.task, c, c->socket.data) - - -#define nxt_event_conn_read(engine, c) \ - do { \ - nxt_event_engine_t *e = engine; \ - \ - c->socket.read_work_queue = &e->read_work_queue; \ - \ - nxt_work_queue_add(&e->read_work_queue, c->io->read, \ - c->socket.task, c, c->socket.data); \ - } while (0) - - -#define nxt_event_conn_write(e, c) \ - do { \ - nxt_event_engine_t *engine = e; \ - \ - c->socket.write_work_queue = &engine->write_work_queue; \ - \ - nxt_work_queue_add(&engine->write_work_queue, c->io->write, \ - c->socket.task, c, c->socket.data); \ - } while (0) - - -extern nxt_event_conn_io_t nxt_unix_event_conn_io; - - -typedef struct { - /* - * Client and peer connections are not embedded because already - * existent connections can be switched to the event connection proxy. - */ - nxt_event_conn_t *client; - nxt_event_conn_t *peer; - nxt_buf_t *client_buffer; - nxt_buf_t *peer_buffer; - - size_t client_buffer_size; - size_t peer_buffer_size; - - nxt_msec_t client_wait_timeout; - nxt_msec_t connect_timeout; - nxt_msec_t reconnect_timeout; - nxt_msec_t peer_wait_timeout; - nxt_msec_t client_write_timeout; - nxt_msec_t peer_write_timeout; - - uint8_t connected; /* 1 bit */ - uint8_t delayed; /* 1 bit */ - uint8_t retries; /* 8 bits */ - uint8_t retain; /* 2 bits */ - - nxt_work_handler_t completion_handler; -} nxt_event_conn_proxy_t; - - -NXT_EXPORT nxt_event_conn_proxy_t *nxt_event_conn_proxy_create( - nxt_event_conn_t *c); -NXT_EXPORT void nxt_event_conn_proxy(nxt_task_t *task, - nxt_event_conn_proxy_t *p); - - -#endif /* _NXT_EVENT_CONN_H_INCLUDED_ */ diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c deleted file mode 100644 index ced2a3d8..00000000 --- a/src/nxt_event_conn_accept.c +++ /dev/null @@ -1,413 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include - - -/* - * A listen socket handler calls an event facility specific io_accept() - * method. The method accept()s a new connection and then calls - * nxt_event_conn_accept() to handle the new connection and to prepare - * for a next connection to avoid just dropping next accept()ed socket - * if no more connections allowed. If there are no available connections - * an idle connection would be closed. If there are no idle connections - * then new connections will not be accept()ed for 1 second. - */ - - -static nxt_event_conn_t *nxt_event_conn_accept_alloc(nxt_task_t *task, - nxt_event_conn_listen_t *cls); -static void nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, - void *data); -static nxt_event_conn_t *nxt_event_conn_accept_next(nxt_task_t *task, - nxt_event_conn_listen_t *cls); -static nxt_int_t nxt_event_conn_accept_close_idle(nxt_task_t *task, - nxt_event_conn_listen_t *cls); -static void nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, - void *data); - - -nxt_event_conn_listen_t * -nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) -{ - nxt_event_engine_t *engine; - nxt_event_conn_listen_t *cls; - - cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t)); - - if (nxt_fast_path(cls != NULL)) { - cls->socket.fd = ls->socket; - - engine = task->thread->engine; - cls->batch = engine->batch; - - cls->socket.read_work_queue = &engine->accept_work_queue; - cls->socket.read_handler = nxt_event_conn_listen_handler; - cls->socket.error_handler = nxt_event_conn_listen_event_error; - cls->socket.log = &nxt_main_log; - - cls->accept = engine->event.io->accept; - - cls->listen = ls; - cls->work_queue = &engine->read_work_queue; - - cls->timer.work_queue = &engine->fast_work_queue; - cls->timer.handler = nxt_event_conn_listen_timer_handler; - cls->timer.log = &nxt_main_log; - - cls->task.thread = task->thread; - cls->task.log = &nxt_main_log; - cls->task.ident = nxt_task_next_ident(); - cls->socket.task = &cls->task; - cls->timer.task = &cls->task; - - if (nxt_event_conn_accept_alloc(task, cls) != NULL) { - nxt_fd_event_enable_accept(engine, &cls->socket); - - nxt_queue_insert_head(&engine->listen_connections, &cls->link); - } - - return cls; - } - - return NULL; -} - - -nxt_int_t -nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) -{ - nxt_event_engine_t *engine; - nxt_event_conn_listen_t *cls; - - cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t)); - - if (nxt_fast_path(cls != NULL)) { - cls->socket.fd = ls->socket; - - engine = task->thread->engine; - cls->batch = engine->batch; - - cls->socket.read_work_queue = &engine->accept_work_queue; - cls->socket.read_handler = nxt_event_conn_listen_handler; - cls->socket.error_handler = nxt_event_conn_listen_event_error; - cls->socket.log = &nxt_main_log; - - cls->accept = engine->event.io->accept; - - cls->listen = ls; - - cls->timer.work_queue = &engine->fast_work_queue; - cls->timer.handler = nxt_event_conn_listen_timer_handler; - cls->timer.log = &nxt_main_log; - - cls->task.thread = task->thread; - cls->task.log = &nxt_main_log; - cls->task.ident = nxt_task_next_ident(); - cls->socket.task = &cls->task; - cls->timer.task = &cls->task; - - if (nxt_event_conn_accept_alloc(task, cls) != NULL) { - nxt_fd_event_enable_accept(engine, &cls->socket); - - nxt_queue_insert_head(&engine->listen_connections, &cls->link); - } - - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_event_conn_t * -nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls) -{ - nxt_sockaddr_t *sa, *remote; - nxt_mem_pool_t *mp; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - nxt_listen_socket_t *ls; - - engine = task->thread->engine; - - if (engine->connections < engine->max_connections) { - - mp = nxt_mem_pool_create(cls->listen->mem_pool_size); - - if (nxt_fast_path(mp != NULL)) { - /* This allocation cannot fail. */ - c = nxt_event_conn_create(mp, cls->socket.task); - - cls->next = c; - c->socket.read_work_queue = cls->socket.read_work_queue; - c->socket.write_ready = 1; - c->listen = cls; - - ls = cls->listen; - /* This allocation cannot fail. */ - remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); - c->remote = remote; - - sa = ls->sockaddr; - remote->type = sa->type; - /* - * Set address family for unspecified Unix domain, - * because these sockaddr's are not be passed to accept(). - */ - remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family; - - return c; - } - } - - return NULL; -} - - -static void -nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_listen_t *cls; - - cls = obj; - cls->ready = cls->batch; - - cls->accept(task, cls, data); -} - - -void -nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) -{ - socklen_t len; - nxt_socket_t s; - struct sockaddr *sa; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; - - cls = obj; - c = cls->next; - - cls->ready--; - cls->socket.read_ready = (cls->ready != 0); - - len = c->remote->socklen; - - if (len >= sizeof(struct sockaddr)) { - sa = &c->remote->u.sockaddr; - - } else { - sa = NULL; - len = 0; - } - - s = accept(cls->socket.fd, sa, &len); - - if (s == -1) { - nxt_event_conn_accept_error(task, cls, "accept", nxt_socket_errno); - return; - } - - c->socket.fd = s; - -#if (NXT_LINUX) - /* - * Linux does not inherit non-blocking mode - * from listen socket for accept()ed socket. - */ - if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { - nxt_socket_close(task, s); - } - -#endif - - nxt_debug(task, "accept(%d): %d", cls->socket.fd, s); - - nxt_event_conn_accept(task, cls, c); -} - - -void -nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls, - nxt_event_conn_t *c) -{ - nxt_event_conn_t *next; - - nxt_sockaddr_text(c->remote); - - nxt_debug(task, "client: %*s", - c->remote->address_length, nxt_sockaddr_address(c->remote)); - - nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); - - c->read_work_queue = cls->work_queue; - c->write_work_queue = cls->work_queue; - - if (cls->listen->read_after_accept) { - - //c->socket.read_ready = 1; -// cls->listen->handler(task, c, cls->socket.data); - nxt_work_queue_add(c->read_work_queue, cls->listen->handler, - task, c, cls->socket.data); - - } else { - nxt_work_queue_add(c->write_work_queue, cls->listen->handler, - task, c, cls->socket.data); - } - - next = nxt_event_conn_accept_next(task, cls); - - if (next != NULL && cls->socket.read_ready) { - nxt_work_queue_add(cls->socket.read_work_queue, - cls->accept, task, cls, next); - } -} - - -static nxt_event_conn_t * -nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls) -{ - nxt_event_conn_t *c; - - cls->next = NULL; - - do { - c = nxt_event_conn_accept_alloc(task, cls); - - if (nxt_fast_path(c != NULL)) { - return c; - } - - } while (nxt_event_conn_accept_close_idle(task, cls) == NXT_OK); - - nxt_log(task, NXT_LOG_CRIT, "no available connections, " - "new connections are not accepted within 1s"); - - return NULL; -} - - -static nxt_int_t -nxt_event_conn_accept_close_idle(nxt_task_t *task, nxt_event_conn_listen_t *cls) -{ - nxt_queue_t *idle; - nxt_queue_link_t *link; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - - static nxt_log_moderation_t nxt_idle_close_log_moderation = { - NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION - }; - - engine = task->thread->engine; - - idle = &engine->idle_connections; - - for (link = nxt_queue_last(idle); - link != nxt_queue_head(idle); - link = nxt_queue_next(link)) - { - c = nxt_queue_link_data(link, nxt_event_conn_t, link); - - if (!c->socket.read_ready) { - nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, - task->log, "no available connections, " - "close idle connection"); - nxt_queue_remove(link); - nxt_event_conn_close(engine, c); - - return NXT_OK; - } - } - - nxt_timer_add(engine, &cls->timer, 1000); - - nxt_fd_event_disable_read(engine, &cls->socket); - - return NXT_DECLINED; -} - - -void -nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, - const char *accept_syscall, nxt_err_t err) -{ - static nxt_log_moderation_t nxt_accept_log_moderation = { - NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION - }; - - cls->socket.read_ready = 0; - - switch (err) { - - case NXT_EAGAIN: - nxt_debug(task, "%s(%d) %E", accept_syscall, cls->socket.fd, err); - return; - - case ECONNABORTED: - nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, - task->log, "%s(%d) failed %E", - accept_syscall, cls->socket.fd, err); - return; - - case EMFILE: - case ENFILE: - case ENOBUFS: - case ENOMEM: - if (nxt_event_conn_accept_close_idle(task, cls) != NXT_OK) { - nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " - "new connections are not accepted within 1s", - accept_syscall, cls->socket.fd, err); - } - - return; - - default: - nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", - accept_syscall, cls->socket.fd, err); - return; - } -} - - -static void -nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; - - ev = obj; - - cls = nxt_timer_data(ev, nxt_event_conn_listen_t, timer); - c = cls->next; - - if (c == NULL) { - c = nxt_event_conn_accept_next(task, cls); - - if (c == NULL) { - return; - } - } - - nxt_fd_event_enable_accept(task->thread->engine, &cls->socket); - - cls->accept(task, cls, c); -} - - -static void -nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_fd_event_t *ev; - - ev = obj; - - nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); -} diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c deleted file mode 100644 index 55aa33f9..00000000 --- a/src/nxt_event_conn_connect.c +++ /dev/null @@ -1,194 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include - - -void -nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_work_handler_t handler; - - c = obj; - - if (nxt_event_conn_socket(task, c) == NXT_OK) { - c->socket.write_work_queue = c->write_work_queue; - handler = c->io->connect; - - } else { - handler = c->write_state->error_handler; - } - - nxt_work_queue_add(&task->thread->engine->connect_work_queue, - handler, task, c, data); -} - - -void -nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_work_handler_t handler; - nxt_event_engine_t *engine; - const nxt_event_conn_state_t *state; - - c = obj; - - state = c->write_state; - - switch (nxt_socket_connect(task, c->socket.fd, c->remote)) { - - case NXT_OK: - c->socket.write_ready = 1; - handler = state->ready_handler; - break; - - case NXT_AGAIN: - c->socket.write_handler = nxt_event_conn_connect_test; - c->socket.error_handler = state->error_handler; - - engine = task->thread->engine; - - nxt_event_conn_timer(engine, c, state, &c->write_timer); - - nxt_fd_event_enable_write(engine, &c->socket); - return; - - case NXT_DECLINED: - handler = state->close_handler; - break; - - default: /* NXT_ERROR */ - handler = state->error_handler; - break; - } - - nxt_work_queue_add(c->write_work_queue, handler, task, c, data); -} - - -nxt_int_t -nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c) -{ - nxt_uint_t family; - nxt_socket_t s; - - nxt_debug(task, "event conn socket"); - - family = c->remote->u.sockaddr.sa_family; - - s = nxt_socket_create(task, family, c->remote->type, 0, NXT_NONBLOCK); - - if (nxt_slow_path(s == -1)) { - return NXT_ERROR; - } - - c->sendfile = 1; - -#if (NXT_HAVE_UNIX_DOMAIN && NXT_SOLARIS) - - if (family == AF_UNIX) { - /* Solaris AF_UNIX does not support sendfilev(). */ - c->sendfile = 0; - } - -#endif - - c->socket.fd = s; - - c->socket.task = task; - c->read_timer.task = task; - c->write_timer.task = task; - - if (c->local != NULL) { - if (nxt_slow_path(nxt_socket_bind(task, s, c->local, 0) != NXT_OK)) { - nxt_socket_close(task, s); - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -void -nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data) -{ - int ret, err; - socklen_t len; - nxt_event_conn_t *c; - - c = obj; - - nxt_debug(task, "event connect test fd:%d", c->socket.fd); - - nxt_fd_event_block_write(task->thread->engine, &c->socket); - - if (c->write_state->timer_autoreset) { - nxt_timer_disable(task->thread->engine, &c->write_timer); - } - - err = 0; - len = sizeof(int); - - /* - * Linux and BSDs return 0 and store a pending error in the err argument; - * Solaris returns -1 and sets the errno. - */ - - ret = getsockopt(c->socket.fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len); - - if (nxt_slow_path(ret == -1)) { - err = nxt_errno; - } - - if (err == 0) { - nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, - task, c, data); - return; - } - - c->socket.error = err; - - nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", - c->socket.fd, c->remote->length, nxt_sockaddr_start(c->remote)); - - nxt_event_conn_connect_error(task, c, data); -} - - -void -nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_work_handler_t handler; - const nxt_event_conn_state_t *state; - - c = obj; - - state = c->write_state; - - switch (c->socket.error) { - - case NXT_ECONNREFUSED: -#if (NXT_LINUX) - case NXT_EAGAIN: - /* - * Linux returns EAGAIN instead of ECONNREFUSED - * for UNIX sockets if a listen queue is full. - */ -#endif - handler = state->close_handler; - break; - - default: - handler = state->error_handler; - break; - } - - nxt_work_queue_add(c->write_work_queue, handler, task, c, data); -} diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c index dae5079c..15d7fe1d 100644 --- a/src/nxt_event_conn_job_sendfile.c +++ b/src/nxt_event_conn_job_sendfile.c @@ -25,11 +25,11 @@ static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data); static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task, - nxt_event_conn_t *c, nxt_buf_t *b); + nxt_conn_t *c, nxt_buf_t *b); void -nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c) +nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c) { nxt_fd_event_disable(task->thread->engine, &c->socket); @@ -41,8 +41,8 @@ nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c) static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) { + nxt_conn_t *c; nxt_iobuf_t b; - nxt_event_conn_t *c; nxt_job_sendfile_t *jbs; nxt_sendbuf_coalesce_t sb; @@ -99,7 +99,7 @@ nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) ssize_t ret; nxt_buf_t *b; nxt_bool_t first; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_job_sendfile_t *jbs; jbs = obj; @@ -166,7 +166,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) size_t sent; nxt_buf_t *b; nxt_bool_t done; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_job_sendfile_t *jbs; jbs = obj; @@ -212,8 +212,8 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) if (c->socket.error == 0 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) { - nxt_event_conn_timer(task->thread->engine, c, c->write_state, - &c->write_timer); + nxt_conn_timer(task->thread->engine, c, c->write_state, + &c->write_timer); nxt_fd_event_oneshot_write(task->thread->engine, &c->socket); } @@ -235,7 +235,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) static nxt_buf_t * -nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c, +nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c, nxt_buf_t *b) { while (b != NULL) { diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c deleted file mode 100644 index 45a6b257..00000000 --- a/src/nxt_event_conn_proxy.c +++ /dev/null @@ -1,1017 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include - - -static void nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, - void *obj, void *data); -static void nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_read_process(nxt_task_t *task, - nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, - nxt_event_conn_t *sink); -static void nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b); -static void nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_write_process(nxt_task_t *task, - nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink, - nxt_event_conn_t *source); -static void nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b); -static void nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, - void *data); -static nxt_msec_t nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, - uintptr_t data); -static void nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_shutdown(nxt_task_t *task, - nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, - nxt_event_conn_t *sink); -static void nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_complete(nxt_task_t *task, - nxt_event_conn_proxy_t *p); -static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, - void *data); - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state; -static const nxt_event_conn_state_t - nxt_event_conn_proxy_client_first_read_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state; - - -nxt_event_conn_proxy_t * -nxt_event_conn_proxy_create(nxt_event_conn_t *client) -{ - nxt_thread_t *thr; - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_event_conn_proxy_t)); - if (nxt_slow_path(p == NULL)) { - return NULL; - } - - peer = nxt_event_conn_create(client->mem_pool, client->socket.task); - if (nxt_slow_path(peer == NULL)) { - return NULL; - } - - thr = nxt_thread(); - - client->read_work_queue = &thr->engine->read_work_queue; - client->write_work_queue = &thr->engine->write_work_queue; - client->socket.read_work_queue = &thr->engine->read_work_queue; - client->socket.write_work_queue = &thr->engine->write_work_queue; - peer->socket.read_work_queue = &thr->engine->read_work_queue; - peer->socket.write_work_queue = &thr->engine->write_work_queue; - - peer->socket.data = client->socket.data; - - peer->read_work_queue = client->read_work_queue; - peer->write_work_queue = client->write_work_queue; - peer->read_timer.work_queue = client->read_work_queue; - peer->write_timer.work_queue = client->write_work_queue; - - p->client = client; - p->peer = peer; - - return p; -} - - -void -nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) -{ - nxt_event_conn_t *peer; - - /* - * Peer read event: not connected, disabled. - * Peer write event: not connected, disabled. - */ - - if (p->client_wait_timeout == 0) { - /* - * Peer write event: waiting for connection - * to be established with connect_timeout. - */ - peer = p->peer; - peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - - nxt_event_conn_connect(task->thread->engine, peer); - } - - /* - * Client read event: waiting for client data with - * client_wait_timeout before buffer allocation. - */ - p->client->read_state = &nxt_event_conn_proxy_client_wait_state; - - nxt_conn_wait(p->client); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_client_buffer_alloc, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_read_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, client_wait_timeout), -}; - - -static void -nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, - void *data) -{ - nxt_buf_t *b; - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - nxt_debug(task, "event conn proxy client first read fd:%d", - client->socket.fd); - - b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, - NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); - - if (nxt_slow_path(b == NULL)) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->client_buffer = b; - client->read = b; - - if (p->peer->socket.fd != -1) { - /* - * Client read event: waiting, no timeout. - * Client write event: blocked. - * Peer read event: disabled. - * Peer write event: waiting for connection to be established - * or blocked after the connection has established. - */ - client->read_state = &nxt_event_conn_proxy_client_read_state; - - } else { - /* - * Client read event: waiting for data with client_wait_timeout - * before connecting to a peer. - * Client write event: blocked. - * Peer read event: not connected, disabled. - * Peer write event: not connected, disabled. - */ - client->read_state = &nxt_event_conn_proxy_client_first_read_state; - } - - nxt_event_conn_read(task->thread->engine, client); -} - - -static const nxt_event_conn_state_t - nxt_event_conn_proxy_client_first_read_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_connect, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_read_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, client_wait_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - /* - * Client read event: waiting, no timeout. - * Client write event: blocked. - * Peer read event: disabled. - * Peer write event: waiting for connection to be established - * with connect_timeout. - */ - client->read_state = &nxt_event_conn_proxy_client_read_state; - - p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - - nxt_event_conn_connect(task->thread->engine, p->peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_connected, - .close_handler = nxt_event_conn_proxy_refused, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_write_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, connect_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client, *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy connected fd:%d", peer->socket.fd); - - p->connected = 1; - - nxt_event_conn_tcp_nodelay_on(task, peer); - nxt_event_conn_tcp_nodelay_on(task, p->client); - - /* Peer read event: waiting with peer_wait_timeout. */ - - peer->read_state = &nxt_event_conn_proxy_peer_wait_state; - peer->write_state = &nxt_event_conn_proxy_peer_write_state; - - nxt_conn_wait(peer); - - if (p->client_buffer != NULL) { - client = p->client; - - client->read_state = &nxt_event_conn_proxy_client_read_state; - client->write_state = &nxt_event_conn_proxy_client_write_state; - /* - * Send a client read data to the connected peer. - * Client write event: blocked. - */ - nxt_event_conn_proxy_read_process(task, p, client, peer); - } -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_read, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_read_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, peer_wait_timeout), -}; - - -static void -nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy peer read fd:%d", peer->socket.fd); - - b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, - NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); - - if (nxt_slow_path(b == NULL)) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->peer_buffer = b; - peer->read = b; - - p->client->write_state = &nxt_event_conn_proxy_client_write_state; - peer->read_state = &nxt_event_conn_proxy_peer_read_state; - peer->write_state = &nxt_event_conn_proxy_peer_write_state; - - /* - * Client read event: waiting, no timeout. - * Client write event: blocked. - * Peer read event: waiting with possible peer_wait_timeout. - * Peer write event: blocked. - */ - nxt_event_conn_read(task->thread->engine, peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_client_read_ready, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_read_error, -}; - - -static void -nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - nxt_debug(task, "event conn proxy client read ready fd:%d", - client->socket.fd); - - nxt_event_conn_proxy_read_process(task, p, client, p->peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_read_ready, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_read_error, -}; - - -static void -nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy peer read ready fd:%d", peer->socket.fd); - - nxt_event_conn_proxy_read_process(task, p, peer, p->client); -} - - -static void -nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, - nxt_event_conn_t *source, nxt_event_conn_t *sink) -{ - nxt_buf_t *rb, *wb; - - if (sink->socket.error != 0) { - nxt_debug(task, "event conn proxy sink fd:%d error:%d", - sink->socket.fd, sink->socket.error); - - nxt_event_conn_proxy_write_error(task, sink, sink->socket.data); - return; - } - - while (source->read != NULL) { - - rb = source->read; - - if (rb->mem.pos != rb->mem.free) { - - /* Add a read part to a write chain. */ - - wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); - if (wb == NULL) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - wb->mem.pos = rb->mem.pos; - wb->mem.free = rb->mem.free; - wb->mem.start = rb->mem.pos; - wb->mem.end = rb->mem.free; - - rb->mem.pos = rb->mem.free; - rb->mem.start = rb->mem.free; - - nxt_event_conn_proxy_write_add(sink, wb); - } - - if (rb->mem.start != rb->mem.end) { - nxt_work_queue_add(source->read_work_queue, - nxt_event_conn_proxy_read, - task, source, source->socket.data); - break; - } - - source->read = rb->next; - nxt_buf_free(source->mem_pool, rb); - } - - if (p->connected) { - nxt_event_conn_write(task->thread->engine, sink); - } -} - - -static void -nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b) -{ - nxt_buf_t *first, *second, *prev; - - first = c->write; - - if (first == NULL) { - c->write = b; - return; - } - - /* - * A event conn proxy maintains a buffer per each direction. - * The buffer is divided by read and write parts. These parts are - * linked in buffer chains. There can be no more than two buffers - * in write chain at any time, because an added buffer is coalesced - * with the last buffer if possible. - */ - - second = first->next; - - if (second == NULL) { - - if (first->mem.end != b->mem.start) { - first->next = b; - return; - } - - /* - * The first buffer is just before the added buffer, so - * expand the first buffer to the end of the added buffer. - */ - prev = first; - - } else { - if (second->mem.end != b->mem.start) { - nxt_thread_log_alert("event conn proxy write: second buffer end:%p " - "is not equal to added buffer start:%p", - second->mem.end, b->mem.start); - return; - } - - /* - * "second->mem.end == b->mem.start" must be always true here, - * that is the second buffer is just before the added buffer, - * so expand the second buffer to the end of added buffer. - */ - prev = second; - } - - prev->mem.free = b->mem.end; - prev->mem.end = b->mem.end; - - nxt_buf_free(c->mem_pool, b); -} - - -static void -nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *source, *sink; - nxt_event_conn_proxy_t *p; - - source = obj; - p = data; - - nxt_debug(task, "event conn proxy read fd:%d", source->socket.fd); - - if (!source->socket.closed) { - sink = (source == p->client) ? p->peer : p->client; - - if (sink->socket.error == 0) { - nxt_event_conn_read(task->thread->engine, source); - } - } -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_client_write_ready, - .error_handler = nxt_event_conn_proxy_write_error, - - .timer_handler = nxt_event_conn_proxy_write_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, client_write_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - nxt_debug(task, "event conn proxy client write ready fd:%d", - client->socket.fd); - - nxt_event_conn_proxy_write_process(task, p, client, p->peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_write_ready, - .error_handler = nxt_event_conn_proxy_write_error, - - .timer_handler = nxt_event_conn_proxy_write_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, peer_write_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy peer write ready fd:%d", peer->socket.fd); - - nxt_event_conn_proxy_write_process(task, p, peer, p->client); -} - - -static void -nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, - nxt_event_conn_t *sink, nxt_event_conn_t *source) -{ - nxt_buf_t *rb, *wb; - - while (sink->write != NULL) { - - wb = sink->write; - - if (nxt_buf_is_sync(wb)) { - - /* A sync buffer marks the end of stream. */ - - sink->write = NULL; - nxt_buf_free(sink->mem_pool, wb); - nxt_event_conn_proxy_shutdown(task, p, source, sink); - return; - } - - if (wb->mem.start != wb->mem.pos) { - - /* Add a written part to a read chain. */ - - rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); - if (rb == NULL) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - rb->mem.pos = wb->mem.start; - rb->mem.free = wb->mem.start; - rb->mem.start = wb->mem.start; - rb->mem.end = wb->mem.pos; - - wb->mem.start = wb->mem.pos; - - nxt_event_conn_proxy_read_add(source, rb); - } - - if (wb->mem.pos != wb->mem.free) { - nxt_event_conn_write(task->thread->engine, sink); - - break; - } - - sink->write = wb->next; - nxt_buf_free(sink->mem_pool, wb); - } - - nxt_work_queue_add(source->read_work_queue, - nxt_event_conn_proxy_read, task, source, - source->socket.data); -} - - -static void -nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b) -{ - nxt_buf_t *first, *second; - - first = c->read; - - if (first == NULL) { - c->read = b; - return; - } - - /* - * A event conn proxy maintains a buffer per each direction. - * The buffer is divided by read and write parts. These parts are - * linked in buffer chains. There can be no more than two buffers - * in read chain at any time, because an added buffer is coalesced - * with the last buffer if possible. The first and the second - * buffers are also coalesced if possible. - */ - - second = first->next; - - if (second == NULL) { - - if (first->mem.start == b->mem.end) { - /* - * The added buffer is just before the first buffer, so expand - * the first buffer to the beginning of the added buffer. - */ - first->mem.pos = b->mem.start; - first->mem.free = b->mem.start; - first->mem.start = b->mem.start; - - } else if (first->mem.end == b->mem.start) { - /* - * The added buffer is just after the first buffer, so - * expand the first buffer to the end of the added buffer. - */ - first->mem.end = b->mem.end; - - } else { - first->next = b; - return; - } - - } else { - if (second->mem.end != b->mem.start) { - nxt_thread_log_alert("event conn proxy read: second buffer end:%p " - "is not equal to added buffer start:%p", - second->mem.end, b->mem.start); - return; - } - - /* - * The added buffer is just after the second buffer, so - * expand the second buffer to the end of the added buffer. - */ - second->mem.end = b->mem.end; - - if (first->mem.start == second->mem.end) { - /* - * The second buffer is just before the first buffer, so expand - * the first buffer to the beginning of the second buffer. - */ - first->mem.pos = second->mem.start; - first->mem.free = second->mem.start; - first->mem.start = second->mem.start; - first->next = NULL; - - nxt_buf_free(c->mem_pool, second); - } - } - - nxt_buf_free(c->mem_pool, b); -} - - -static void -nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_event_conn_t *source, *sink; - nxt_event_conn_proxy_t *p; - - source = obj; - p = data; - - nxt_debug(task, "event conn proxy close fd:%d", source->socket.fd); - - sink = (source == p->client) ? p->peer : p->client; - - if (sink->write == NULL) { - nxt_event_conn_proxy_shutdown(task, p, source, sink); - return; - } - - b = nxt_buf_sync_alloc(source->mem_pool, 0); - if (b == NULL) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - nxt_buf_chain_add(&sink->write, b); -} - - -static void -nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_event_conn_proxy_t *p; - - c = obj; - p = data; - - nxt_debug(task, "event conn proxy error fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, p); -} - - -static void -nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - - ev = obj; - - c = nxt_event_read_timer_conn(ev); - c->socket.timedout = 1; - c->socket.closed = 1; - - nxt_debug(task, "event conn proxy read timeout fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, c->socket.data); -} - - -static void -nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - - ev = obj; - - c = nxt_event_write_timer_conn(ev); - c->socket.timedout = 1; - c->socket.closed = 1; - - nxt_debug(task, "event conn proxy write timeout fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, c->socket.data); -} - - -static nxt_msec_t -nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data) -{ - nxt_msec_t *timer; - nxt_event_conn_proxy_t *p; - - p = c->socket.data; - - timer = (nxt_msec_t *) ((char *) p + data); - - return *timer; -} - - -static void -nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy refused fd:%d", peer->socket.fd); - - if (p->retries == 0) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->retries--; - - nxt_socket_close(task, peer->socket.fd); - peer->socket.fd = -1; - peer->socket.error = 0; - - p->delayed = 1; - - peer->write_timer.handler = nxt_event_conn_proxy_reconnect_handler; - nxt_timer_add(task->thread->engine, &peer->write_timer, - p->reconnect_timeout); -} - - -static void -nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - ev = obj; - - nxt_debug(task, "event conn proxy reconnect timer"); - - peer = nxt_event_write_timer_conn(ev); - p = peer->socket.data; - - if (p->client->socket.closed) { - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->delayed = 0; - - peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - /* - * Peer read event: disabled. - * Peer write event: waiting for connection with connect_timeout. - */ - nxt_event_conn_connect(task->thread->engine, peer); -} - - -static void -nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, - nxt_event_conn_t *source, nxt_event_conn_t *sink) -{ - nxt_buf_t *b; - - nxt_debug(source->socket.task, - "event conn proxy shutdown source fd:%d cl:%d err:%d", - source->socket.fd, source->socket.closed, source->socket.error); - - nxt_debug(sink->socket.task, - "event conn proxy shutdown sink fd:%d cl:%d err:%d", - sink->socket.fd, sink->socket.closed, sink->socket.error); - - if (!p->connected || p->delayed) { - nxt_event_conn_proxy_complete(task, p); - return; - } - - if (sink->socket.error == 0 && !sink->socket.closed) { - sink->socket.shutdown = 1; - nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); - } - - if (sink->socket.error != 0 - || (sink->socket.closed && source->write == NULL)) - { - /* The opposite direction also has been already closed. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - nxt_debug(source->socket.task, "free source buffer"); - - /* Free the direction's buffer. */ - b = (source == p->client) ? p->client_buffer : p->peer_buffer; - nxt_mem_free(source->mem_pool, b); -} - - -static void -nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_event_conn_proxy_t *p; - - c = obj; - p = data; - - nxt_debug(task, "event conn proxy read error fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, p); -} - - -static void -nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *source, *sink; - nxt_event_conn_proxy_t *p; - - sink = obj; - p = data; - - nxt_debug(task, "event conn proxy write error fd:%d", sink->socket.fd); - - /* Clear data for the direction sink. */ - sink->write = NULL; - - /* Block the direction source. */ - source = (sink == p->client) ? p->peer : p->client; - nxt_fd_event_block_read(task->thread->engine, &source->socket); - - if (source->write == NULL) { - /* - * There is no data for the opposite direction and - * the next read from the sink will most probably fail. - */ - nxt_event_conn_proxy_complete(task, p); - } -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_close_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_completion, -}; - - -static void -nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) -{ - nxt_event_engine_t *engine; - - engine = task->thread->engine; - - nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d", - p->client->socket.fd, p->peer->socket.fd); - - if (p->delayed) { - p->delayed = 0; - nxt_queue_remove(&p->peer->link); - } - - if (p->client->socket.fd != -1) { - p->retain = 1; - p->client->write_state = &nxt_event_conn_proxy_close_state; - nxt_event_conn_close(engine, p->client); - } - - if (p->peer->socket.fd != -1) { - p->retain++; - p->peer->write_state = &nxt_event_conn_proxy_close_state; - nxt_event_conn_close(engine, p->peer); - } -} - - -static void -nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_proxy_t *p; - - p = data; - - nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d", - p->retain, p->client->socket.fd, p->peer->socket.fd); - - p->retain--; - - if (p->retain == 0) { - nxt_mem_free(p->client->mem_pool, p->client_buffer); - nxt_mem_free(p->client->mem_pool, p->peer_buffer); - - p->completion_handler(task, p, NULL); - } -} diff --git a/src/nxt_event_conn_read.c b/src/nxt_event_conn_read.c deleted file mode 100644 index be2dfdb2..00000000 --- a/src/nxt_event_conn_read.c +++ /dev/null @@ -1,243 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include - - -void -nxt_conn_wait(nxt_event_conn_t *c) -{ - nxt_event_engine_t *engine; - const nxt_event_conn_state_t *state; - - nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", - c->socket.fd, c->socket.read_ready); - - engine = c->socket.task->thread->engine; - state = c->read_state; - - if (c->socket.read_ready) { - nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, - c->socket.task, c, c->socket.data); - return; - } - - c->socket.read_handler = state->ready_handler; - c->socket.error_handler = state->error_handler; - - nxt_event_conn_timer(engine, c, state, &c->read_timer); - - nxt_fd_event_enable_read(engine, &c->socket); -} - - -void -nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) -{ - ssize_t n; - nxt_buf_t *b; - nxt_work_queue_t *wq; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - nxt_work_handler_t handler; - const nxt_event_conn_state_t *state; - - c = obj; - - nxt_debug(task, "event conn read fd:%d rdy:%d cl:%d", - c->socket.fd, c->socket.read_ready, c->socket.closed); - - engine = task->thread->engine; - - state = c->read_state; - - if (c->socket.read_ready) { - - b = c->read; - - if (c->peek == 0) { - n = c->io->recvbuf(c, b); - - } else { - n = c->io->recv(c, b->mem.free, c->peek, MSG_PEEK); - } - - if (n > 0) { - c->nbytes = n; - - nxt_recvbuf_update(b, n); - - nxt_fd_event_block_read(engine, &c->socket); - - if (state->timer_autoreset) { - nxt_timer_disable(engine, &c->read_timer); - } - - wq = c->read_work_queue; - handler = state->ready_handler; - - nxt_work_queue_add(wq, handler, task, c, data); - - return; - } - - if (n != NXT_AGAIN) { - nxt_fd_event_block_read(engine, &c->socket); - nxt_timer_disable(engine, &c->read_timer); - - wq = &engine->fast_work_queue; - - handler = (n == 0) ? state->close_handler : state->error_handler; - - nxt_work_queue_add(wq, handler, task, c, data); - - return; - } - } - - /* - * Here c->io->read() is assigned instead of direct - * nxt_event_conn_io_read() because the function can - * be called by nxt_kqueue_event_conn_io_read(). - */ - c->socket.read_handler = c->io->read; - c->socket.error_handler = state->error_handler; - - if (c->read_timer.state == NXT_TIMER_DISABLED - || nxt_fd_event_is_disabled(c->socket.read)) - { - /* Timer may be set or reset. */ - nxt_event_conn_timer(engine, c, state, &c->read_timer); - - if (nxt_fd_event_is_disabled(c->socket.read)) { - nxt_fd_event_enable_read(engine, &c->socket); - } - } - - return; -} - - -ssize_t -nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) -{ - ssize_t n; - nxt_err_t err; - nxt_uint_t niov; - struct iovec iov[NXT_IOBUF_MAX]; - nxt_recvbuf_coalesce_t rb; - - rb.buf = b; - rb.iobuf = iov; - rb.nmax = NXT_IOBUF_MAX; - rb.size = 0; - - niov = nxt_recvbuf_mem_coalesce(&rb); - - if (niov == 1) { - /* Disposal of surplus kernel iovec copy-in operation. */ - return nxt_event_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); - } - - for ( ;; ) { - n = readv(c->socket.fd, iov, niov); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); - - if (n > 0) { - if ((size_t) n < rb.size) { - c->socket.read_ready = 0; - } - - return n; - } - - if (n == 0) { - c->socket.closed = 1; - c->socket.read_ready = 0; - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - nxt_debug(c->socket.task, "readv() %E", err); - c->socket.read_ready = 0; - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(c->socket.task, "readv() %E", err); - continue; - - default: - c->socket.error = err; - nxt_log(c->socket.task, nxt_socket_error_level(err), - "readv(%d, %ui) failed %E", c->socket.fd, niov, err); - - return NXT_ERROR; - } - } -} - - -ssize_t -nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, - nxt_uint_t flags) -{ - ssize_t n; - nxt_err_t err; - - for ( ;; ) { - n = recv(c->socket.fd, buf, size, flags); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", - c->socket.fd, buf, size, flags, n); - - if (n > 0) { - if ((size_t) n < size) { - c->socket.read_ready = 0; - } - - return n; - } - - if (n == 0) { - c->socket.closed = 1; - c->socket.read_ready = 0; - - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - nxt_debug(c->socket.task, "recv() %E", err); - c->socket.read_ready = 0; - - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(c->socket.task, "recv() %E", err); - continue; - - default: - c->socket.error = err; - nxt_log(c->socket.task, nxt_socket_error_level(err), - "recv(%d, %p, %uz, %ui) failed %E", - c->socket.fd, buf, size, flags, err); - - return NXT_ERROR; - } - } -} diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c deleted file mode 100644 index fa5b9241..00000000 --- a/src/nxt_event_conn_write.c +++ /dev/null @@ -1,420 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include - - -static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, - void *data); - - -void -nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) -{ - ssize_t ret; - nxt_buf_t *b; - nxt_sendbuf_t sb; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - - c = obj; - - nxt_debug(task, "event conn write fd:%d", c->socket.fd); - - if (!c->socket.write_ready || c->write == NULL) { - return; - } - - engine = task->thread->engine; - - c->socket.write_handler = nxt_conn_io_write; - c->socket.error_handler = c->write_state->error_handler; - - b = c->write; - - sb.socket = c->socket.fd; - sb.error = 0; - sb.sent = 0; - sb.size = 0; - sb.buf = b; - sb.limit = 10 * 1024 * 1024; - sb.ready = 1; - sb.sync = 0; - - do { - ret = nxt_conn_io_sendbuf(task, &sb); - - c->socket.write_ready = sb.ready; - c->socket.error = sb.error; - - if (ret < 0) { - /* ret == NXT_AGAIN || ret == NXT_ERROR. */ - break; - } - - sb.sent += ret; - sb.limit -= ret; - - b = nxt_sendbuf_update(b, ret); - - if (b == NULL) { - nxt_fd_event_block_write(engine, &c->socket); - break; - } - - sb.buf = b; - - if (!c->socket.write_ready) { - ret = NXT_AGAIN; - break; - } - - } while (sb.limit != 0); - - nxt_debug(task, "event conn: %i sent:%z", ret, sb.sent); - - if (sb.sent != 0) { - if (c->write_state->timer_autoreset) { - nxt_timer_disable(engine, &c->write_timer); - } - } - - if (ret != NXT_ERROR) { - - if (sb.limit == 0) { - /* - * Postpone writing until next event poll to allow to - * process other recevied events and to get new events. - */ - c->write_timer.handler = nxt_event_conn_write_timer_handler; - nxt_timer_add(engine, &c->write_timer, 0); - - } else if (ret == NXT_AGAIN) { - /* - * SSL libraries can require to toggle either write or read - * event if renegotiation occurs during SSL write operation. - * This case is handled on the event_io->send() level. Timer - * can be set here because it should be set only for write - * direction. - */ - nxt_event_conn_timer(engine, c, c->write_state, &c->write_timer); - - if (nxt_fd_event_is_disabled(c->socket.write)) { - nxt_fd_event_enable_write(engine, &c->socket); - } - } - } - - if (ret == 0 || sb.sent != 0) { - /* "ret == 0" means a sync buffer was processed. */ - c->sent += sb.sent; - nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, - task, c, data); - /* - * Fall through if first operations were - * successful but the last one failed. - */ - } - - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_fd_event_block_write(engine, &c->socket); - - nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, - task, c, data); - } -} - - -size_t -nxt_event_conn_write_limit(nxt_event_conn_t *c) -{ - ssize_t limit, correction; - nxt_event_write_rate_t *rate; - - rate = c->rate; - - if (rate == NULL) { - return c->max_chunk; - } - - limit = rate->limit; - correction = limit - (size_t) rate->average; - - nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f", - correction, rate->average); - - limit += correction; - - if (limit <= 0) { - return 0; - } - - if (rate->limit_after != 0) { - limit += rate->limit_after; - limit = nxt_min((size_t) limit, rate->max_limit); - } - - return nxt_min((size_t) limit, c->max_chunk); -} - - -nxt_bool_t -nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c, - size_t sent) -{ - return 0; -} - - -static void -nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - - ev = obj; - - nxt_debug(task, "event conn conn timer"); - - c = nxt_event_write_timer_conn(ev); - c->delayed = 0; - - c->io->write(task, c, c->socket.data); -} - - -ssize_t -nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) -{ - ssize_t ret; - - ret = c->io->sendbuf(c, b, limit); - - if ((ret == NXT_AGAIN || !c->socket.write_ready) - && nxt_fd_event_is_disabled(c->socket.write)) - { - nxt_fd_event_enable_write(c->socket.task->thread->engine, &c->socket); - } - - return ret; -} - - -ssize_t -nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) -{ - nxt_uint_t niob; - struct iovec iob[NXT_IOBUF_MAX]; - nxt_sendbuf_coalesce_t sb; - - sb.buf = b; - sb.iobuf = iob; - sb.nmax = NXT_IOBUF_MAX; - sb.sync = 0; - sb.size = 0; - sb.limit = limit; - - niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); - - if (niob == 0 && sb.sync) { - return 0; - } - - return nxt_event_conn_io_writev(c, iob, niob); -} - - -ssize_t -nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) -{ - ssize_t n; - nxt_err_t err; - - if (niob == 1) { - /* Disposal of surplus kernel iovec copy-in operation. */ - return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len); - } - - for ( ;; ) { - n = writev(c->socket.fd, iob, niob); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(c->socket.task, "writev(%d, %ui): %d", c->socket.fd, niob, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - nxt_debug(c->socket.task, "writev() %E", err); - c->socket.write_ready = 0; - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(c->socket.task, "writev() %E", err); - continue; - - default: - c->socket.error = err; - nxt_log(c->socket.task, nxt_socket_error_level(err), - "writev(%d, %ui) failed %E", c->socket.fd, niob, err); - return NXT_ERROR; - } - } -} - - -ssize_t -nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) -{ - ssize_t n; - nxt_err_t err; - - for ( ;; ) { - n = send(c->socket.fd, buf, size, 0); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(c->socket.task, "send(%d, %p, %uz): %z", - c->socket.fd, buf, size, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - nxt_debug(c->socket.task, "send() %E", err); - c->socket.write_ready = 0; - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(c->socket.task, "send() %E", err); - continue; - - default: - c->socket.error = err; - nxt_log(c->socket.task, nxt_socket_error_level(err), - "send(%d, %p, %uz) failed %E", - c->socket.fd, buf, size, err); - return NXT_ERROR; - } - } -} - - -ssize_t -nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) -{ - nxt_uint_t niov; - struct iovec iov[NXT_IOBUF_MAX]; - - niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX); - - if (niov == 0 && sb->sync) { - return 0; - } - - return nxt_conn_io_writev(task, sb, iov, niov); -} - - -ssize_t -nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, - nxt_uint_t niov) -{ - ssize_t n; - nxt_err_t err; - - if (niov == 1) { - /* Disposal of surplus kernel iovec copy-in operation. */ - return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len); - } - - for ( ;; ) { - n = writev(sb->socket, iov, niov); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - sb->ready = 0; - nxt_debug(task, "writev() %E", err); - - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(task, "writev() %E", err); - continue; - - default: - sb->error = err; - nxt_log(task, nxt_socket_error_level(err), - "writev(%d, %ui) failed %E", sb->socket, niov, err); - - return NXT_ERROR; - } - } -} - - -ssize_t -nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size) -{ - ssize_t n; - nxt_err_t err; - - for ( ;; ) { - n = send(sb->socket, buf, size, 0); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - sb->ready = 0; - nxt_debug(task, "send() %E", err); - - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(task, "send() %E", err); - continue; - - default: - sb->error = err; - nxt_log(task, nxt_socket_error_level(err), - "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err); - - return NXT_ERROR; - } - } -} diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 6947cc8d..5b602bc0 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -123,13 +123,13 @@ typedef struct { * events. */ void (*enable_file)(nxt_event_engine_t *engine, - nxt_event_file_t *fev); + nxt_file_event_t *ev); /* * Delete a file from an event set before closing the file descriptor. */ void (*close_file)(nxt_event_engine_t *engine, - nxt_event_file_t *fev); + nxt_file_event_t *ev); /* * Enable post event notifications and set a post handler to handle @@ -157,7 +157,7 @@ typedef struct { nxt_msec_t timeout); /* I/O operations suitable to underlying event facility. */ - nxt_event_conn_io_t *io; + nxt_conn_io_t *io; /* True if an event facility supports file change event notifications. */ uint8_t file_support; /* 1 bit */ diff --git a/src/nxt_event_file.h b/src/nxt_event_file.h deleted file mode 100644 index 06f1762f..00000000 --- a/src/nxt_event_file.h +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_EVENT_FILE_H_INCLUDED_ -#define _NXT_EVENT_FILE_H_INCLUDED_ - - -typedef struct { - void *data; - nxt_file_t *file; - nxt_work_handler_t handler; - nxt_task_t *task; -} nxt_event_file_t; - - -#endif /* _NXT_EVENT_FILE_H_INCLUDED_ */ diff --git a/src/nxt_eventport_engine.c b/src/nxt_eventport_engine.c index 7dc0ffa8..2bf111f9 100644 --- a/src/nxt_eventport_engine.c +++ b/src/nxt_eventport_engine.c @@ -78,7 +78,7 @@ const nxt_event_interface_t nxt_eventport_engine = { nxt_eventport_signal, nxt_eventport_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_file_event.h b/src/nxt_file_event.h new file mode 100644 index 00000000..d4255ce9 --- /dev/null +++ b/src/nxt_file_event.h @@ -0,0 +1,18 @@ +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_FILE_EVENT_H_INCLUDED_ +#define _NXT_FILE_EVENT_H_INCLUDED_ + + +typedef struct { + void *data; + nxt_file_t *file; + nxt_work_handler_t handler; + nxt_task_t *task; +} nxt_file_event_t; + + +#endif /* _NXT_FILE_EVENT_H_INCLUDED_ */ diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c index c5e40eeb..4582b314 100644 --- a/src/nxt_kqueue_engine.c +++ b/src/nxt_kqueue_engine.c @@ -78,9 +78,9 @@ static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_kqueue_enable_file(nxt_event_engine_t *engine, - nxt_event_file_t *ev); + nxt_file_event_t *ev); static void nxt_kqueue_close_file(nxt_event_engine_t *engine, - nxt_event_file_t *ev); + nxt_file_event_t *ev); static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_int_t filter, nxt_uint_t flags); static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine); @@ -98,26 +98,25 @@ static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo); #endif static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); -static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data); static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, - nxt_buf_t *b); +static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); -static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { - nxt_kqueue_event_conn_io_connect, - nxt_kqueue_event_conn_io_accept, +static nxt_conn_io_t nxt_kqueue_conn_io = { + nxt_kqueue_conn_io_connect, + nxt_kqueue_conn_io_accept, - nxt_kqueue_event_conn_io_read, - nxt_kqueue_event_conn_io_recvbuf, - nxt_event_conn_io_recv, + nxt_kqueue_conn_io_read, + nxt_kqueue_conn_io_recvbuf, + nxt_conn_io_recv, nxt_conn_io_write, nxt_event_conn_io_write_chunk, @@ -133,7 +132,7 @@ static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { nxt_event_conn_io_writev, nxt_event_conn_io_send, - nxt_event_conn_io_shutdown, + nxt_conn_io_shutdown, }; @@ -165,7 +164,7 @@ const nxt_event_interface_t nxt_kqueue_engine = { #endif nxt_kqueue_poll, - &nxt_kqueue_event_conn_io, + &nxt_kqueue_conn_io, NXT_FILE_EVENTS, NXT_SIGNAL_EVENTS, @@ -414,7 +413,7 @@ nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) static void -nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) +nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) { struct kevent *kev; @@ -437,7 +436,7 @@ nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) static void -nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) +nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) { /* TODO: pending event. */ } @@ -497,7 +496,7 @@ nxt_kqueue_error(nxt_event_engine_t *engine) { struct kevent *kev, *end; nxt_fd_event_t *ev; - nxt_event_file_t *fev; + nxt_file_event_t *fev; nxt_work_queue_t *wq; wq = &engine->fast_work_queue; @@ -551,7 +550,7 @@ nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_file_t *ev; + nxt_file_event_t *ev; ev = obj; @@ -678,7 +677,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) nxt_fd_event_t *ev; nxt_sig_event_t *sigev; struct timespec ts, *tp; - nxt_event_file_t *fev; + nxt_file_event_t *fev; nxt_work_queue_t *wq; nxt_work_handler_t handler; @@ -850,9 +849,9 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) */ static void -nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_event_conn_state_t *state; @@ -869,11 +868,11 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; case NXT_AGAIN: - c->socket.write_handler = nxt_kqueue_event_conn_connected; - c->socket.error_handler = nxt_event_conn_connect_error; + c->socket.write_handler = nxt_kqueue_conn_connected; + c->socket.error_handler = nxt_conn_connect_error; engine = task->thread->engine; - nxt_event_conn_timer(engine, c, state, &c->write_timer); + nxt_conn_timer(engine, c, state, &c->write_timer); nxt_kqueue_enable_write(engine, &c->socket); return; @@ -892,13 +891,13 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) static void -nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; - nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd); + nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd); c->socket.write = NXT_EVENT_BLOCKED; @@ -914,36 +913,36 @@ nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data) static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_listen_t *cls; + nxt_listen_event_t *lev; - cls = obj; + lev = obj; nxt_debug(task, "kevent fd:%d avail:%D", - cls->socket.fd, cls->socket.kq_available); + lev->socket.fd, lev->socket.kq_available); - cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available); + lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available); - nxt_kqueue_event_conn_io_accept(task, cls, data); + nxt_kqueue_conn_io_accept(task, lev, data); } static void -nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data) { - socklen_t len; - nxt_socket_t s; - struct sockaddr *sa; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; + socklen_t len; + nxt_conn_t *c; + nxt_socket_t s; + struct sockaddr *sa; + nxt_listen_event_t *lev; - cls = obj; - c = cls->next; + lev = obj; + c = lev->next; - cls->ready--; - cls->socket.read_ready = (cls->ready != 0); + lev->ready--; + lev->socket.read_ready = (lev->ready != 0); - cls->socket.kq_available--; - cls->socket.read_ready = (cls->socket.kq_available != 0); + lev->socket.kq_available--; + lev->socket.read_ready = (lev->socket.kq_available != 0); len = c->remote->socklen; @@ -955,34 +954,34 @@ nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) len = 0; } - s = accept(cls->socket.fd, sa, &len); + s = accept(lev->socket.fd, sa, &len); if (s != -1) { c->socket.fd = s; - nxt_debug(task, "accept(%d): %d", cls->socket.fd, s); + nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); - nxt_event_conn_accept(task, cls, c); + nxt_conn_accept(task, lev, c); return; } - nxt_event_conn_accept_error(task, cls, "accept", nxt_errno); + nxt_conn_accept_error(task, lev, "accept", nxt_errno); } /* - * nxt_kqueue_event_conn_io_read() is just a wrapper to eliminate the + * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the * readv() or recv() syscall if a remote side just closed connection. */ static void -nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; - nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd); + nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd); if (c->socket.kq_available == 0 && c->socket.kq_eof) { nxt_debug(task, "kevent fd:%d eof", c->socket.fd); @@ -993,18 +992,18 @@ nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data) return; } - nxt_event_conn_io_read(task, c, data); + nxt_conn_io_read(task, c, data); } /* - * nxt_kqueue_event_conn_io_recvbuf() is just wrapper around standard - * nxt_event_conn_io_recvbuf() to eliminate the readv() or recv() syscalls + * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard + * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls * if there is no pending data or a remote side closed connection. */ static ssize_t -nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) { ssize_t n; @@ -1013,7 +1012,7 @@ nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) return 0; } - n = nxt_event_conn_io_recvbuf(c, b); + n = nxt_conn_io_recvbuf(c, b); if (n > 0) { c->socket.kq_available -= n; diff --git a/src/nxt_listen_socket.c b/src/nxt_listen_socket.c index 08751dfc..7a0e3a7c 100644 --- a/src/nxt_listen_socket.c +++ b/src/nxt_listen_socket.c @@ -241,7 +241,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls) #endif return size + sizeof(nxt_mem_pool_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_t) + sizeof(nxt_log_t); } diff --git a/src/nxt_macosx_sendfile.c b/src/nxt_macosx_sendfile.c index f636819c..2c6ea954 100644 --- a/src/nxt_macosx_sendfile.c +++ b/src/nxt_macosx_sendfile.c @@ -26,8 +26,7 @@ static int nxt_sys_sendfile(int fd, int s, off_t offset, off_t *len, ssize_t -nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, - size_t limit) +nxt_macosx_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit) { size_t hd_size, file_size; ssize_t n; diff --git a/src/nxt_main.h b/src/nxt_main.h index 24ec4ca1..19db0aa3 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -110,7 +110,7 @@ typedef struct nxt_port_mmap_s nxt_port_mmap_t; #include #include -typedef struct nxt_event_conn_s nxt_event_conn_t; +typedef struct nxt_conn_s nxt_conn_t; #include #include @@ -129,6 +129,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #include #include +#include #include #include @@ -137,12 +138,10 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #endif -typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, - nxt_event_conn_t *c); +typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, nxt_conn_t *c); #include -#include -#include +#include #include #include diff --git a/src/nxt_openssl.c b/src/nxt_openssl.c index 17e65a56..66f0ad32 100644 --- a/src/nxt_openssl.c +++ b/src/nxt_openssl.c @@ -23,23 +23,20 @@ typedef struct { static nxt_int_t nxt_openssl_server_init(nxt_ssltls_conf_t *conf); static void nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, - nxt_event_conn_t *c); -static void nxt_openssl_session_cleanup(void *data); + nxt_conn_t *c); +static void nxt_openssl_session_cleanup(nxt_task_t *task, void *data); static void nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data); static void nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data); static void nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, - nxt_buf_t *b, size_t limit); -static ssize_t nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf, - size_t size); +static ssize_t nxt_openssl_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, + size_t limit); +static ssize_t nxt_openssl_conn_io_send(nxt_conn_t *c, void *buf, size_t size); static nxt_int_t nxt_openssl_conn_test_error(nxt_task_t *task, - nxt_event_conn_t *c, int ret, nxt_err_t sys_err, - nxt_work_handler_t handler); -static void nxt_cdecl nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, + nxt_conn_t *c, int ret, nxt_err_t sys_err, nxt_work_handler_t handler); +static void nxt_cdecl nxt_openssl_conn_error(nxt_conn_t *c, nxt_err_t err, const char *fmt, ...); -static nxt_uint_t nxt_openssl_log_error_level(nxt_event_conn_t *c, - nxt_err_t err); +static nxt_uint_t nxt_openssl_log_error_level(nxt_conn_t *c, nxt_err_t err); static void nxt_cdecl nxt_openssl_log_error(nxt_uint_t level, nxt_log_t *log, const char *fmt, ...); static u_char *nxt_openssl_copy_error(u_char *p, u_char *end); @@ -51,7 +48,7 @@ const nxt_ssltls_lib_t nxt_openssl_lib = { }; -static nxt_event_conn_io_t nxt_openssl_event_conn_io = { +static nxt_conn_io_t nxt_openssl_conn_io = { NULL, NULL, @@ -249,8 +246,7 @@ fail: static void -nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, - nxt_event_conn_t *c) +nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, nxt_conn_t *c) { int ret; SSL *s; @@ -301,7 +297,7 @@ nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, goto fail; } - c->io = &nxt_openssl_event_conn_io; + c->io = &nxt_openssl_conn_io; c->sendfile = NXT_CONN_SENDFILE_OFF; nxt_openssl_conn_handshake(task, c, c->socket.data); @@ -315,13 +311,13 @@ fail: static void -nxt_openssl_session_cleanup(void *data) +nxt_openssl_session_cleanup(nxt_task_t *task, void *data) { nxt_openssl_conn_t *ssltls; ssltls = data; - nxt_thread_log_debug("openssl session cleanup"); + nxt_debug(task, "openssl session cleanup"); nxt_free(ssltls->buffer.start); @@ -335,7 +331,7 @@ nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data) int ret; nxt_int_t n; nxt_err_t err; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_openssl_conn_t *ssltls; c = obj; @@ -382,7 +378,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data) nxt_buf_t *b; nxt_int_t n; nxt_err_t err; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_work_handler_t handler; nxt_openssl_conn_t *ssltls; @@ -432,7 +428,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data) static ssize_t -nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) +nxt_openssl_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit) { nxt_openssl_conn_t *ssltls; @@ -445,7 +441,7 @@ nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) static ssize_t -nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) +nxt_openssl_conn_io_send(nxt_conn_t *c, void *buf, size_t size) { int ret; nxt_err_t err; @@ -491,7 +487,7 @@ nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) nxt_err_t err; nxt_int_t n; nxt_bool_t quiet, once; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_work_handler_t handler; nxt_openssl_conn_t *ssltls; @@ -586,7 +582,7 @@ done: static nxt_int_t -nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret, +nxt_openssl_conn_test_error(nxt_task_t *task, nxt_conn_t *c, int ret, nxt_err_t sys_err, nxt_work_handler_t handler) { u_long lib_err; @@ -664,7 +660,7 @@ nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret, static void nxt_cdecl -nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, const char *fmt, ...) +nxt_openssl_conn_error(nxt_conn_t *c, nxt_err_t err, const char *fmt, ...) { u_char *p, *end; va_list args; @@ -697,7 +693,7 @@ nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, const char *fmt, ...) static nxt_uint_t -nxt_openssl_log_error_level(nxt_event_conn_t *c, nxt_err_t err) +nxt_openssl_log_error_level(nxt_conn_t *c, nxt_err_t err) { switch (ERR_GET_REASON(ERR_peek_error())) { diff --git a/src/nxt_poll_engine.c b/src/nxt_poll_engine.c index 90a8176e..607cd144 100644 --- a/src/nxt_poll_engine.c +++ b/src/nxt_poll_engine.c @@ -88,7 +88,7 @@ const nxt_event_interface_t nxt_poll_engine = { NULL, nxt_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_pollset_engine.c b/src/nxt_pollset_engine.c index 571ad794..402f954c 100644 --- a/src/nxt_pollset_engine.c +++ b/src/nxt_pollset_engine.c @@ -79,7 +79,7 @@ const nxt_event_interface_t nxt_pollset_engine = { NULL, nxt_pollset_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_router.c b/src/nxt_router.c index c87b48b8..67acc12d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -71,8 +71,7 @@ static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c, - uintptr_t data); +static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); nxt_int_t @@ -207,8 +206,8 @@ nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) skcf->listen.handler = nxt_router_conn_init; skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) - + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_proxy_t) + + sizeof(nxt_conn_t) + 4 * sizeof(nxt_buf_t); skcf->header_buffer_size = 2048; @@ -222,8 +221,8 @@ nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) skcf->listen.handler = nxt_stream_connection_init; skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) - + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_proxy_t) + + sizeof(nxt_conn_t) + 4 * sizeof(nxt_buf_t); skcf->header_read_timeout = 5000; @@ -881,7 +880,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) } -static const nxt_event_conn_state_t nxt_router_conn_read_state +static const nxt_conn_state_t nxt_router_conn_read_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_http_header_parse, @@ -898,7 +897,7 @@ static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) { size_t size; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_socket_conf_joint_t *joint; @@ -920,11 +919,11 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) c->read_state = &nxt_router_conn_read_state; - nxt_event_conn_read(engine, c); + nxt_conn_read(engine, c); } -static const nxt_event_conn_state_t nxt_router_conn_write_state +static const nxt_conn_state_t nxt_router_conn_write_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_close, @@ -939,7 +938,7 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) size_t size; nxt_int_t ret; nxt_buf_t *b; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_socket_conf_joint_t *joint; nxt_http_request_parse_t *rp; @@ -996,7 +995,7 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) c->read = b; } - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); return; } @@ -1004,11 +1003,11 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) c->write->mem.pos = c->write->mem.start; c->write_state = &nxt_router_conn_write_state; - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); } -static const nxt_event_conn_state_t nxt_router_conn_close_state +static const nxt_conn_state_t nxt_router_conn_close_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_free, @@ -1018,7 +1017,7 @@ static const nxt_event_conn_state_t nxt_router_conn_close_state static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -1026,14 +1025,14 @@ nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) c->write_state = &nxt_router_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_socket_conf_joint_t *joint; c = obj; @@ -1050,7 +1049,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -1058,30 +1057,30 @@ nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) c->write_state = &nxt_router_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *timer; - nxt_event_conn_t *c; + nxt_conn_t *c; + nxt_timer_t *timer; timer = obj; nxt_debug(task, "router conn timeout"); - c = nxt_event_read_timer_conn(timer); + c = nxt_read_timer_conn(timer); c->write_state = &nxt_router_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static nxt_msec_t -nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) +nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) { nxt_socket_conf_joint_t *joint; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 62b0e3b5..e35baa8f 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -497,9 +497,9 @@ nxt_runtime_quit(nxt_task_t *task) static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) { + nxt_conn_t *c; nxt_queue_t *idle; nxt_queue_link_t *link, *next; - nxt_event_conn_t *c; nxt_debug(&engine->task, "close idle connections"); @@ -510,11 +510,11 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) link = next) { next = nxt_queue_next(link); - c = nxt_queue_link_data(link, nxt_event_conn_t, link); + c = nxt_queue_link_data(link, nxt_conn_t, link); if (!c->socket.read_ready) { nxt_queue_remove(link); - nxt_event_conn_close(engine, c); + nxt_conn_close(engine, c); } } } diff --git a/src/nxt_select_engine.c b/src/nxt_select_engine.c index 8a6f0710..6f760012 100644 --- a/src/nxt_select_engine.c +++ b/src/nxt_select_engine.c @@ -57,7 +57,7 @@ const nxt_event_interface_t nxt_select_engine = { NULL, nxt_select_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index e8fbe2a0..361cb0cd 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -202,8 +202,8 @@ nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) ssize_t -nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm, - nxt_buf_t *b, size_t limit) +nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b, + size_t limit) { size_t size, bsize, copied; ssize_t n; diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index 0b789583..b159bfe5 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -66,41 +66,41 @@ typedef struct { #if (NXT_HAVE_LINUX_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_linux_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_FREEBSD_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_freebsd_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_SOLARIS_SENDFILEV) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_solaris_event_conn_io_sendfilev(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_MACOSX_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_macosx_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_AIX_SEND_FILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_aix_event_conn_io_send_file(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_HPUX_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_hpux_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif -ssize_t nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit); @@ -116,7 +116,7 @@ size_t nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb); * SSL/TLS libraries which lack vector I/O interface yet add noticeable * overhead to each SSL/TLS record. */ -ssize_t nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm, +ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b, size_t limit); nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent); diff --git a/src/nxt_ssltls.h b/src/nxt_ssltls.h index aa32348d..f12335a7 100644 --- a/src/nxt_ssltls.h +++ b/src/nxt_ssltls.h @@ -35,8 +35,7 @@ typedef struct { struct nxt_ssltls_conf_s { void *ctx; void (*conn_init)(nxt_task_t *task, - nxt_ssltls_conf_t *conf, - nxt_event_conn_t *c); + nxt_ssltls_conf_t *conf, nxt_conn_t *c); const nxt_ssltls_lib_t *lib; diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c index e89cd6fd..6000daaf 100644 --- a/src/nxt_stream_module.c +++ b/src/nxt_stream_module.c @@ -17,8 +17,8 @@ static void nxt_stream_connection_close(nxt_task_t *task, void *obj, void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) { + nxt_conn_t *c; nxt_runtime_t *rt; - nxt_event_conn_t *c; nxt_upstream_peer_t *up; c = obj; @@ -57,8 +57,8 @@ fail: static void nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) { - nxt_event_conn_t *c; - nxt_event_conn_proxy_t *p; + nxt_conn_t *c; + nxt_conn_proxy_t *p; c = up->data; @@ -67,7 +67,7 @@ nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) nxt_log_debug(c->socket.log, "stream connection peer %*s", up->sockaddr->length, nxt_sockaddr_start(up->sockaddr)); - p = nxt_event_conn_proxy_create(c); + p = nxt_conn_proxy_create(c); if (nxt_slow_path(p == NULL)) { goto fail; } @@ -107,7 +107,7 @@ nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) rate->last = engine->timers.now; } - nxt_event_conn_proxy(task, p); + nxt_conn_proxy(task, p); return; fail: diff --git a/src/nxt_stream_source.c b/src/nxt_stream_source.c index 9b667b99..66ec1640 100644 --- a/src/nxt_stream_source.c +++ b/src/nxt_stream_source.c @@ -58,7 +58,7 @@ nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream) stream->conn = c; c->socket.data = stream; - nxt_event_conn_work_queue_set(c, us->work_queue); + nxt_conn_work_queue_set(c, us->work_queue); c->remote = us->peer->sockaddr; c->write_state = &nxt_stream_source_connect_state; @@ -158,7 +158,7 @@ nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "stream source write ready fd:%d", c->socket.fd); - nxt_event_conn_read(task, c); + nxt_conn_read(task, c); } @@ -212,7 +212,7 @@ nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data) c->read_state = &nxt_stream_source_response_read_state; - nxt_event_conn_read(task, c); + nxt_conn_read(task, c); return; fail: @@ -425,7 +425,7 @@ nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "stream source closed fd:%d", c->socket.fd); - nxt_event_conn_close(task, c); + nxt_conn_close(task, c); b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool, NXT_BUF_SYNC_LAST); @@ -463,7 +463,7 @@ nxt_stream_source_error(nxt_task_t *task, void *obj, void *data) static void nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream) { - nxt_event_conn_close(task, stream->conn); + nxt_conn_close(task, stream->conn); stream->error_handler(task, stream); } diff --git a/src/nxt_stream_source.h b/src/nxt_stream_source.h index 27a6bfc8..2d57073f 100644 --- a/src/nxt_stream_source.h +++ b/src/nxt_stream_source.h @@ -14,7 +14,7 @@ typedef void (*nxt_stream_source_handler_t)(nxt_task_t *task, nxt_stream_source_t *s); struct nxt_stream_source_s { - nxt_event_conn_t *conn; + nxt_conn_t *conn; nxt_source_hook_t *next; nxt_upstream_source_t *upstream; diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index b6266520..6407c734 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -45,12 +45,12 @@ const nxt_sig_event_t nxt_worker_process_signals[] = { static void nxt_worker_process_quit(nxt_task_t *task) { - nxt_uint_t n; - nxt_queue_t *listen; - nxt_runtime_t *rt; - nxt_queue_link_t *link, *next; - nxt_listen_socket_t *ls; - nxt_event_conn_listen_t *cls; + nxt_uint_t n; + nxt_queue_t *listen; + nxt_runtime_t *rt; + nxt_queue_link_t *link, *next; + nxt_listen_event_t *lev; + nxt_listen_socket_t *ls; rt = task->thread->runtime; @@ -63,10 +63,10 @@ nxt_worker_process_quit(nxt_task_t *task) link = next) { next = nxt_queue_next(link); - cls = nxt_queue_link_data(link, nxt_event_conn_listen_t, link); + lev = nxt_queue_link_data(link, nxt_listen_event_t, link); nxt_queue_remove(link); - nxt_fd_event_close(task->thread->engine, &cls->socket); + nxt_fd_event_close(task->thread->engine, &lev->socket); } if (rt->listen_sockets != NULL) { -- cgit