diff options
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/modules/ngx_devpoll_module.c | 8 | ||||
| -rw-r--r-- | src/event/modules/ngx_iocp_module.c | 137 | ||||
| -rw-r--r-- | src/event/modules/ngx_iocp_module.h | 15 | ||||
| -rw-r--r-- | src/event/modules/ngx_kqueue_module.c | 35 | ||||
| -rw-r--r-- | src/event/modules/ngx_kqueue_module.h | 6 | ||||
| -rw-r--r-- | src/event/modules/ngx_poll_module.c | 9 | ||||
| -rw-r--r-- | src/event/modules/ngx_select_module.c | 6 | ||||
| -rw-r--r-- | src/event/ngx_event.c | 43 | ||||
| -rw-r--r-- | src/event/ngx_event.h | 71 | ||||
| -rw-r--r-- | src/event/ngx_event_accept.c | 41 | ||||
| -rw-r--r-- | src/event/ngx_event_acceptex.c | 150 | ||||
| -rw-r--r-- | src/event/ngx_event_acceptex.h | 13 | ||||
| -rw-r--r-- | src/event/ngx_event_aio_read.c | 113 | ||||
| -rw-r--r-- | src/event/ngx_event_aio_write.c | 115 | ||||
| -rw-r--r-- | src/event/ngx_event_close.c | 2 | ||||
| -rw-r--r-- | src/event/ngx_event_recv.c | 35 | ||||
| -rw-r--r-- | src/event/ngx_event_timer.h | 4 | ||||
| -rw-r--r-- | src/event/ngx_event_write.c | 73 | ||||
| -rw-r--r-- | src/event/ngx_event_wsarecv.c | 97 |
19 files changed, 896 insertions, 77 deletions
diff --git a/src/event/modules/ngx_devpoll_module.c b/src/event/modules/ngx_devpoll_module.c index 13971ad4d..e2e7750f1 100644 --- a/src/event/modules/ngx_devpoll_module.c +++ b/src/event/modules/ngx_devpoll_module.c @@ -16,6 +16,7 @@ #error "/dev/poll is not supported on this platform" #endif +static int ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags); /* STUB */ #define DEVPOLL_NCHANGES 512 @@ -137,7 +138,7 @@ int ngx_devpoll_del_event(ngx_event_t *ev, int event, u_int flags) } -int ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags) +static int ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags) { int n; ngx_connection_t *c; @@ -192,7 +193,6 @@ int ngx_devpoll_process_events(ngx_log_t *log) int events, n, i; ngx_msec_t timer, delta; ngx_err_t err; - ngx_event_t *ev; ngx_connection_t *c; struct dvpoll dvp; struct timeval tv; @@ -233,7 +233,7 @@ int ngx_devpoll_process_events(ngx_log_t *log) nchanges = 0; - if (timer != INFTIM) { + if ((int) timer != INFTIM) { gettimeofday(&tv, NULL); delta = tv.tv_sec * 1000 + tv.tv_usec / 1000 - delta; @@ -305,7 +305,7 @@ int ngx_devpoll_process_events(ngx_log_t *log) } } - if (timer != INFTIM) { + if ((int) timer != INFTIM) { ngx_event_expire_timers(delta); } diff --git a/src/event/modules/ngx_iocp_module.c b/src/event/modules/ngx_iocp_module.c new file mode 100644 index 000000000..f474c4c61 --- /dev/null +++ b/src/event/modules/ngx_iocp_module.c @@ -0,0 +1,137 @@ + +#include <ngx_config.h> + +#include <ngx_core.h> +#include <ngx_log.h> +#include <ngx_errno.h> +#include <ngx_time.h> +#include <ngx_connection.h> +#include <ngx_event.h> +#include <ngx_event_timer.h> + +#include <ngx_iocp_module.h> + + +int ngx_iocp_threads = 0;; + + +static HANDLE iocp; +static ngx_event_t *timer_queue; + + +int ngx_iocp_init(int max_connections, ngx_log_t *log) +{ + iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, + NULL, 0, ngx_iocp_threads); + + if (iocp == NULL) { + ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, + "CreateIoCompletionPort() failed"); + return NGX_ERROR; + } + + timer_queue = ngx_event_init_timer(log); + if (timer_queue == NULL) { + return NGX_ERROR; + } + + ngx_event_actions.process = ngx_iocp_process_events; + + ngx_event_flags = NGX_HAVE_AIO_EVENT|NGX_HAVE_IOCP_EVENT; + + return NGX_OK; +} + + +int ngx_iocp_add_event(ngx_event_t *ev) +{ + ngx_connection_t *c; + + c = (ngx_connection_t *) ev->data; + + ngx_log_debug(ev->log, "iocp: %d, %08x:%08x" _ c->fd _ ev _ &ev->ovlp); + + if (CreateIoCompletionPort((HANDLE) c->fd, iocp, (DWORD) ev, 0) == NULL) { + ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, + "CreateIoCompletionPort() failed"); + return NGX_ERROR; + } + + return NGX_OK; +} + + +int ngx_iocp_process_events(ngx_log_t *log) +{ + int rc; + size_t bytes; + ngx_err_t err; + ngx_msec_t timer, delta; + ngx_event_t *ev, *e; + ngx_event_ovlp_t *ovlp; + + ngx_log_debug(log, "iocp"); + + timer = ngx_event_find_timer(); + + if (timer) { + delta = ngx_msec(); + + } else { + timer = INFINITE; + delta = 0; + } + + ngx_log_debug(log, "iocp timer: %d" _ timer); + +#if 1 + rc = GetQueuedCompletionStatus(iocp, &bytes, (LPDWORD) &e, + (LPOVERLAPPED *) &ovlp, timer); + ngx_log_debug(log, "iocp: %d, %d:%08x:%08x" _ rc _ bytes _ e _ ovlp); + if (rc == 0) { +#else + if (GetQueuedCompletionStatus(iocp, &bytes, (LPDWORD) &e, + (LPOVERLAPPED *) &ovlp, timer) == 0) { +#endif + err = ngx_errno; + + if (ovlp == NULL) { + if (err != WAIT_TIMEOUT) { + ngx_log_error(NGX_LOG_ALERT, log, err, + "GetQueuedCompletionStatus() failed"); + + return NGX_ERROR; + } + + } else { + ovlp->error = err; + } + } + + if (timer != INFINITE) { + delta = ngx_msec() - delta; + } + + if (ovlp) { + ev = ovlp->event; + +ngx_log_debug(log, "iocp ev: %08x" _ ev); + + if (ev == e) { + ev->ready = 1; + ev->available = bytes; + } + +ngx_log_debug(log, "iocp ev: %08x" _ ev->event_handler); + + if (ev->event_handler(ev) == NGX_ERROR) { + ev->close_handler(ev); + } + } + + if (timer != INFINITE) { + ngx_event_expire_timers(delta); + } + + return NGX_OK; +} diff --git a/src/event/modules/ngx_iocp_module.h b/src/event/modules/ngx_iocp_module.h new file mode 100644 index 000000000..d7f2f5192 --- /dev/null +++ b/src/event/modules/ngx_iocp_module.h @@ -0,0 +1,15 @@ +#ifndef _NGX_IOCP_MODULE_H_INCLUDED_ +#define _NGX_IOCP_MODULE_H_INCLUDED_ + + +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_event.h> + + +int ngx_iocp_init(int max_connections, ngx_log_t *log); +int ngx_iocp_add_event(ngx_event_t *ev); +int ngx_iocp_process_events(ngx_log_t *log); + + +#endif /* _NGX_IOCP_MODULE_H_INCLUDED_ */ diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c index abf3b6e61..da8450e62 100644 --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -23,7 +23,11 @@ /* should be per-thread */ +#if 1 +int kq; +#else static int kq; +#endif static struct kevent *change_list, *event_list; static unsigned int nchanges; static int nevents; @@ -63,7 +67,12 @@ int ngx_kqueue_init(int max_connections, ngx_log_t *log) ngx_event_actions.process = ngx_kqueue_process_events; ngx_event_flags = NGX_HAVE_LEVEL_EVENT - |NGX_HAVE_ONESHOT_EVENT|NGX_HAVE_CLEAR_EVENT; + |NGX_HAVE_ONESHOT_EVENT +#if (HAVE_AIO_EVENT) + |NGX_HAVE_AIO_EVENT; +#else + |NGX_HAVE_CLEAR_EVENT; +#endif #endif return NGX_OK; @@ -221,10 +230,19 @@ int ngx_kqueue_process_events(ngx_log_t *log) for (i = 0; i < events; i++) { #if (NGX_DEBUG_EVENT) - ngx_log_debug(log, "kevent: %d: ft:%d f:%08x ff:%08x d:%d ud:%08x" _ - event_list[i].ident _ event_list[i].filter _ - event_list[i].flags _ event_list[i].fflags _ - event_list[i].data _ event_list[i].udata); + if (event_list[i].ident > 0x8000000) { + ngx_log_debug(log, + "kevent: %08x: ft:%d f:%08x ff:%08x d:%d ud:%08x" _ + event_list[i].ident _ event_list[i].filter _ + event_list[i].flags _ event_list[i].fflags _ + event_list[i].data _ event_list[i].udata); + } else { + ngx_log_debug(log, + "kevent: %d: ft:%d f:%08x ff:%08x d:%d ud:%08x" _ + event_list[i].ident _ event_list[i].filter _ + event_list[i].flags _ event_list[i].fflags _ + event_list[i].data _ event_list[i].udata); + } #endif if (event_list[i].flags & EV_ERROR) { @@ -243,7 +261,6 @@ int ngx_kqueue_process_events(ngx_log_t *log) case EVFILT_READ: case EVFILT_WRITE: - ev->ready = 1; ev->available = event_list[i].data; if (event_list[i].flags & EV_EOF) { @@ -255,12 +272,18 @@ int ngx_kqueue_process_events(ngx_log_t *log) ngx_del_timer(ev); } + /* fall through */ + + case EVFILT_AIO: + ev->ready = 1; + if (ev->event_handler(ev) == NGX_ERROR) { ev->close_handler(ev); } break; + default: ngx_log_error(NGX_LOG_ALERT, log, 0, "unknown kevent filter %d" _ event_list[i].filter); diff --git a/src/event/modules/ngx_kqueue_module.h b/src/event/modules/ngx_kqueue_module.h index c56192018..568476ed0 100644 --- a/src/event/modules/ngx_kqueue_module.h +++ b/src/event/modules/ngx_kqueue_module.h @@ -14,4 +14,10 @@ void ngx_kqueue_add_timer(ngx_event_t *ev, ngx_msec_t timer); int ngx_kqueue_process_events(ngx_log_t *log); +#if 1 +extern int kq; +#endif + + + #endif /* _NGX_KQUEUE_MODULE_H_INCLUDED_ */ diff --git a/src/event/modules/ngx_poll_module.c b/src/event/modules/ngx_poll_module.c index e27031cfc..939340e46 100644 --- a/src/event/modules/ngx_poll_module.c +++ b/src/event/modules/ngx_poll_module.c @@ -13,7 +13,7 @@ /* should be per-thread */ static struct pollfd *event_list; -static unsigned int nevents; +static u_int nevents; static ngx_event_t **event_index; static ngx_event_t **ready_index; @@ -140,7 +140,8 @@ int ngx_poll_del_event(ngx_event_t *ev, int event, u_int flags) int ngx_poll_process_events(ngx_log_t *log) { - int i, ready, nready, found; + int ready, found; + u_int i, nready; ngx_msec_t timer, delta; ngx_err_t err; ngx_event_t *ev; @@ -172,7 +173,7 @@ int ngx_poll_process_events(ngx_log_t *log) ngx_log_debug(log, "poll ready %d" _ ready); - if (timer != INFTIM) { + if ((int) timer != INFTIM) { delta = ngx_msec() - delta; } else { @@ -256,7 +257,7 @@ int ngx_poll_process_events(ngx_log_t *log) ngx_log_error(NGX_LOG_ALERT, log, 0, "poll ready != events"); } - if (timer != INFTIM) { + if ((int) timer != INFTIM) { ngx_event_expire_timers(delta); } diff --git a/src/event/modules/ngx_select_module.c b/src/event/modules/ngx_select_module.c index fe461c3f5..50d030c74 100644 --- a/src/event/modules/ngx_select_module.c +++ b/src/event/modules/ngx_select_module.c @@ -23,7 +23,7 @@ static int max_write; static int max_fd; #endif -static int nevents; +static u_int nevents; static ngx_event_t **event_index; static ngx_event_t **ready_index; @@ -177,8 +177,8 @@ int ngx_select_del_event(ngx_event_t *ev, int event, u_int flags) int ngx_select_process_events(ngx_log_t *log) { - int ready, found, nready; - u_int i; + int ready, found; + u_int i, nready; ngx_msec_t timer, delta; ngx_event_t *ev; ngx_connection_t *c; diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c index 63cf999cd..063385dba 100644 --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -12,16 +12,24 @@ #include <ngx_event_accept.h> #include <ngx_select_module.h> + #if (HAVE_POLL) #include <ngx_poll_module.h> #endif + #if (HAVE_DEVPOLL) #include <ngx_devpoll_module.h> #endif + #if (HAVE_KQUEUE) #include <ngx_kqueue_module.h> #endif +#if (HAVE_IOCP) +#include <ngx_event_acceptex.h> +#include <ngx_iocp_module.h> +#endif + ngx_connection_t *ngx_connections; ngx_event_t *ngx_read_events, *ngx_write_events; @@ -68,7 +76,10 @@ static int (*ngx_event_init[]) (int max_connections, ngx_log_t *log) = { ngx_devpoll_init, #endif #if (HAVE_KQUEUE) - ngx_kqueue_init + ngx_kqueue_init, +#endif +#if (HAVE_IOCP) + ngx_iocp_init #endif }; @@ -86,6 +97,10 @@ void ngx_pre_thread(ngx_array_t *ls, ngx_pool_t *pool, ngx_log_t *log) /* STUB */ int max_connections = 512; +#if (HAVE_IOCP) + ngx_event_type = NGX_IOCP_EVENT; +#endif + if (ngx_init_events(max_connections, log) == NGX_ERROR) { exit(1); } @@ -127,16 +142,38 @@ void ngx_pre_thread(ngx_array_t *ls, ngx_pool_t *pool, ngx_log_t *log) ngx_memcpy(ev->log, c->log, sizeof(ngx_log_t)); c->read = ev; ev->data = c; - ev->event_handler = &ngx_event_accept; - ev->listening = 1; ev->index = NGX_INVALID_INDEX; +#if 0 + ev->listening = 1; +#endif ev->available = 0; #if (HAVE_DEFERRED_ACCEPT) ev->deferred_accept = s[i].deferred_accept; #endif + +#if (HAVE_IOCP) + + if (ngx_event_flags & NGX_HAVE_IOCP_EVENT) { + ev->event_handler = &ngx_event_acceptex; + + if (ngx_iocp_add_event(ev) == NGX_ERROR) { + return NGX_ERROR; + } + + ngx_event_post_acceptex(&s[i], 1); + + } else { + ev->event_handler = &ngx_event_accept; + } + +#else + + ev->event_handler = &ngx_event_accept; ngx_add_event(ev, NGX_READ_EVENT, 0); + +#endif } } diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h index a91881067..d81be38fe 100644 --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -15,6 +15,15 @@ typedef struct ngx_event_s ngx_event_t; +#if (HAVE_IOCP) +typedef struct { + WSAOVERLAPPED ovlp; + ngx_event_t *event; + int error; +} ngx_event_ovlp_t; +#endif + + struct ngx_event_s { void *data; @@ -45,11 +54,11 @@ struct ngx_event_s { /* otherwise: */ /* accept: 1 if accept many, 0 otherwise */ - /* flags - int are probably faster on write then bits ??? */ - unsigned oneshot:1; +#if 0 unsigned listening:1; +#endif unsigned write:1; unsigned active:1; @@ -66,11 +75,24 @@ struct ngx_event_s { #if (HAVE_DEFERRED_ACCEPT) unsigned deferred_accept:1; #endif + #if (HAVE_KQUEUE) unsigned eof:1; int error; #endif + +#if (HAVE_AIO) + +#if (HAVE_IOCP) + ngx_event_ovlp_t ovlp; +#else + struct aiocb aiocb; +#endif + +#endif + + #if 0 void *thr_ctx; /* event thread context if $(CC) doesn't understand __thread declaration @@ -94,6 +116,10 @@ typedef enum { #if (HAVE_KQUEUE) NGX_KQUEUE_EVENT, #endif +#if (HAVE_IOCP) + NGX_IOCP_EVENT, +#endif + NGX_DUMMY_EVENT /* avoid comma at end of enumerator list */ } ngx_event_type_e ; typedef struct { @@ -125,6 +151,10 @@ typedef struct { /* No need to add or delete event filters - overlapped, aio_read, aioread */ #define NGX_HAVE_AIO_EVENT 16 +/* Need to add socket or halde only once - i/o completion port. + It also requires to set HAVE_AIO_EVENT and NGX_HAVE_AIO_EVENT */ +#define NGX_HAVE_IOCP_EVENT 32 + /* Event filter is deleted before closing file. Has no meaning for select, poll, epoll. @@ -187,41 +217,24 @@ typedef struct { #define ngx_process_events ngx_event_actions.process #define ngx_add_event ngx_event_actions.add #define ngx_del_event ngx_event_actions.del + #if 0 #define ngx_add_timer ngx_event_actions.timer #else #define ngx_add_timer ngx_event_add_timer #endif -#define ngx_event_recv ngx_event_recv_core +#if (HAVE_IOCP_EVENT) +#define ngx_event_recv ngx_event_wsarecv +#elif (HAVE_AIO_EVENT) +#define ngx_event_recv ngx_event_aio_read +#else +#define ngx_event_recv ngx_event_recv_core #endif -#define ngx_del_timer ngx_event_del_timer - - -#if 0 -ngx_inline static void ngx_del_timer(ngx_event_t *ev) -{ -#if (NGX_DEBUG_EVENT) - /* STUB - we can not cast (ngx_connection_t *) here */ - ngx_log_debug(ev->log, "del timer: %d" _ *(int *)(ev->data)); #endif - if (ev->timer_prev) { - ev->timer_prev->timer_next = ev->timer_next; - } - - if (ev->timer_next) { - ev->timer_next->timer_delta += ev->timer_delta; - ev->timer_next->timer_prev = ev->timer_prev; - ev->timer_next = NULL; - } - - if (ev->timer_prev) { - ev->timer_prev = NULL; - } -} -#endif +#define ngx_del_timer ngx_event_del_timer @@ -236,6 +249,10 @@ extern int ngx_event_flags; #endif +ssize_t ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size); +int ngx_event_close_connection(ngx_event_t *ev); + + void ngx_pre_thread(ngx_array_t *ls, ngx_pool_t *pool, ngx_log_t *log); void ngx_worker(ngx_log_t *log); diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c index 14422afdf..2d64c0d9b 100644 --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -31,7 +31,9 @@ int ngx_event_accept(ngx_event_t *ev) ev->ready = 0; +#if 0 /* DEBUG */ ev->available++; +#endif do { ngx_test_null(pool, ngx_create_pool(ls->pool_size, ev->log), NGX_OK); @@ -55,13 +57,40 @@ int ngx_event_accept(ngx_event_t *ev) return NGX_OK; } -#if !(HAVE_INHERITED_NONBLOCK) + +#if (HAVE_INHERITED_NONBLOCK) + +#if (HAVE_AIO_EVENT) + if ((ngx_event_flags & NGX_HAVE_AIO_EVENT)) { + if (ngx_blocking(s) == -1) { + ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, + ngx_blocking_n " %s failed", ls->addr_text.data); + return NGX_OK; + } + } +#endif + +#else /* !HAVE_INHERITED_NONBLOCK */ + +#if (HAVE_AIO_EVENT) + if (!(ngx_event_flags & NGX_HAVE_AIO_EVENT)) { + if (ngx_nonblocking(s) == -1) { + ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, + ngx_nonblocking_n " %s failed", ls->addr_text.data); + return NGX_OK; + } + } +#else if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " %s failed", ls->addr_text.data); + return NGX_OK; } #endif +#endif /* HAVE_INHERITED_NONBLOCK */ + + rev = &ngx_read_events[s]; wev = &ngx_write_events[s]; c = &ngx_connections[s]; @@ -88,9 +117,15 @@ int ngx_event_accept(ngx_event_t *ev) c->fd = s; c->unexpected_eof = 1; wev->write = 1; - wev->ready = 1; - wev->timer = rev->timer = 10000; +#if (HAVE_AIO_EVENT) + if (!(ngx_event_flags & NGX_HAVE_AIO_EVENT)) { + wev->ready = 1; + } +#endif + + /* STUB ? */ wev->timer = rev->timer = 10000; + wev->timer_handler = rev->timer_handler = ngx_event_close_connection; wev->close_handler = rev->close_handler = ngx_event_close_connection; diff --git a/src/event/ngx_event_acceptex.c b/src/event/ngx_event_acceptex.c new file mode 100644 index 000000000..83b3c42fc --- /dev/null +++ b/src/event/ngx_event_acceptex.c @@ -0,0 +1,150 @@ + +#include <ngx_config.h> + +#include <ngx_core.h> +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_listen.h> +#include <ngx_connection.h> +#include <ngx_event.h> +#include <ngx_event_close.h> +#include <ngx_iocp_module.h> + +#include <ngx_event_acceptex.h> + + + +/* This function should always return NGX_OK even there are some failures + because if we return NGX_ERROR then listening socket would be closed */ + +int ngx_event_acceptex(ngx_event_t *ev) +{ + ngx_connection_t *c; + + c = (ngx_connection_t *) ev->data; + + if (ev->ovlp.error) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ev->ovlp.error, + "AcceptEx(%s) falied", c->addr_text.data); + return NGX_OK; + } + + GetAcceptExSockaddrs(c->data, 0, + c->socklen + 16, c->socklen + 16, + &c->local_sockaddr, &c->local_socklen, + &c->sockaddr, &c->socklen); + + ngx_event_post_acceptex(c->listening, 1); + + /* STUB: InterlockedInc() */ + c->number = ngx_connection_counter++; + + c->handler(c); + + return NGX_OK; + +} + + +int ngx_event_post_acceptex(ngx_listen_t *ls, int n) +{ + int i; + u_int rcvd; + ngx_err_t err; + ngx_pool_t *pool; + ngx_event_t *rev, *wev; + ngx_socket_t s; + ngx_connection_t *c; + + for (i = 0; i < n; i++) { + + /* TODO: look up reused sockets */ + + ngx_log_debug(ls->log, "socket: %x" _ ls->flags); + + s = ngx_socket(ls->family, ls->type, ls->protocol, ls->flags); + + if (s == -1) { + ngx_log_error(NGX_LOG_ALERT, ls->log, ngx_socket_errno, + ngx_socket_n " for AcceptEx(%s) falied", + ls->addr_text.data); + + return NGX_ERROR; + } + + ngx_test_null(pool, ngx_create_pool(ls->pool_size, ls->log), NGX_ERROR); + + rev = &ngx_read_events[s]; + wev = &ngx_write_events[s]; + c = &ngx_connections[s]; + + ngx_memzero(rev, sizeof(ngx_event_t)); + ngx_memzero(wev, sizeof(ngx_event_t)); + ngx_memzero(c, sizeof(ngx_connection_t)); + + c->pool = pool; + + rev->index = wev->index = NGX_INVALID_INDEX; + + rev->ovlp.event = rev; + wev->ovlp.event = wev; + + rev->data = wev->data = c; + c->read = rev; + c->write = wev; + + c->family = ls->family; + c->socklen = ls->socklen; + c->addr = ls->addr; + c->addr_text_max_len = ls->addr_text_max_len; + c->post_accept_timeout = ls->post_accept_timeout; + + c->listening = ls; + c->fd = s; + + c->unexpected_eof = 1; + wev->write = 1; + + c->handler = ls->handler; + rev->event_handler = ngx_event_acceptex; + + wev->timer_handler = rev->timer_handler = ngx_event_close_connection; + wev->close_handler = rev->close_handler = ngx_event_close_connection; + + c->ctx = ls->ctx; + c->servers = ls->servers; + + ngx_test_null(c->data, ngx_palloc(pool, 2 * (c->socklen + 16)), + NGX_ERROR); + ngx_test_null(c->local_sockaddr, ngx_palloc(pool, c->socklen), + NGX_ERROR); + ngx_test_null(c->sockaddr, ngx_palloc(pool, c->socklen), + NGX_ERROR); + + ngx_test_null(c->log, ngx_palloc(c->pool, sizeof(ngx_log_t)), + NGX_ERROR); + ngx_memcpy(c->log, ls->log, sizeof(ngx_log_t)); + rev->log = wev->log = c->log; + + if (ngx_iocp_add_event(rev) == NGX_ERROR) { + return NGX_ERROR; + } + + if (AcceptEx(ls->fd, s, c->data, 0, + c->socklen + 16, c->socklen + 16, + &rcvd, (LPOVERLAPPED) &rev->ovlp) == 0) { + + err = ngx_socket_errno; + if (err == WSA_IO_PENDING) { + return NGX_OK; + } + + ngx_log_error(NGX_LOG_ALERT, ls->log, err, + "AcceptEx(%s) falied", ls->addr_text.data); + + return NGX_ERROR; + } + } + + return NGX_OK; +} diff --git a/src/event/ngx_event_acceptex.h b/src/event/ngx_event_acceptex.h new file mode 100644 index 000000000..8d55af1cc --- /dev/null +++ b/src/event/ngx_event_acceptex.h @@ -0,0 +1,13 @@ +#ifndef _NGX_EVENT_ACCEPTEX_H_INCLUDED_ +#define _NGX_EVENT_ACCEPTEX_H_INCLUDED_ + + +#include <ngx_listen.h> +#include <ngx_event.h> + + +int ngx_event_acceptex(ngx_event_t *ev); +int ngx_event_post_acceptex(ngx_listen_t *ls, int n); + + +#endif /* _NGX_EVENT_ACCEPTEX_H_INCLUDED_ */ diff --git a/src/event/ngx_event_aio_read.c b/src/event/ngx_event_aio_read.c new file mode 100644 index 000000000..4561cf4bf --- /dev/null +++ b/src/event/ngx_event_aio_read.c @@ -0,0 +1,113 @@ + +#include <ngx_config.h> +#include <ngx_core.h> +#include <ngx_errno.h> +#include <ngx_log.h> +#include <ngx_recv.h> +#include <ngx_connection.h> +#include <ngx_event.h> + +#if (HAVE_KQUEUE) +#include <ngx_kqueue_module.h> +#endif + + +/* + The data is ready - 3 syscalls: + aio_read(), aio_error(), aio_return() + The data is not ready - 4 (kqueue) or 5 syscalls: + aio_read(), aio_error(), notifiction, + aio_error(), aio_return() + aio_cancel(), aio_error() +*/ + +ssize_t ngx_event_aio_read(ngx_connection_t *c, char *buf, size_t size) +{ + int rc, first, canceled; + ngx_event_t *ev; + + ev = c->read; + + canceled = 0; + + if (ev->timedout) { + ngx_set_socket_errno(NGX_ETIMEDOUT); + ngx_log_error(NGX_LOG_ERR, ev->log, 0, "aio_read() timed out"); + + rc = aio_cancel(c->fd, &ev->aiocb); + if (rc == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, + "aio_cancel() failed"); + return NGX_ERROR; + } + + ngx_log_debug(ev->log, "aio_cancel: %d" _ rc); + + canceled = 1; + + ev->ready = 1; + } + + first = 0; + + if (!ev->ready) { + ngx_memzero(&ev->aiocb, sizeof(struct aiocb)); + + ev->aiocb.aio_fildes = c->fd; + ev->aiocb.aio_buf = buf; + ev->aiocb.aio_nbytes = size; + +#if (HAVE_KQUEUE) + ev->aiocb.aio_sigevent.sigev_notify_kqueue = kq; + ev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + ev->aiocb.aio_sigevent.sigev_value.sigval_ptr = ev; +#endif + + if (aio_read(&ev->aiocb) == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, + "aio_read() failed"); + return NGX_ERROR; + } + + ngx_log_debug(ev->log, "aio_read: OK"); + + ev->active = 1; + first = 1; + } + + ev->ready = 0; + + rc = aio_error(&ev->aiocb); + if (rc == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_error() failed"); + return NGX_ERROR; + } + + if (rc != 0) { + if (rc == NGX_EINPROGRESS) { + if (!first) { + ngx_log_error(NGX_LOG_CRIT, ev->log, rc, + "aio_read() still in progress"); + } + return NGX_AGAIN; + } + + if (rc == NGX_ECANCELED && canceled) { + return NGX_ERROR; + } + + ngx_log_error(NGX_LOG_CRIT, ev->log, rc, "aio_read() failed"); + return NGX_ERROR; + } + + rc = aio_return(&ev->aiocb); + if (rc == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_return() failed"); + + return NGX_ERROR; + } + + ngx_log_debug(ev->log, "aio_read: %d" _ rc); + + return rc; +} diff --git a/src/event/ngx_event_aio_write.c b/src/event/ngx_event_aio_write.c new file mode 100644 index 000000000..00da4f4cc --- /dev/null +++ b/src/event/ngx_event_aio_write.c @@ -0,0 +1,115 @@ + +#include <ngx_config.h> +#include <ngx_core.h> +#include <ngx_errno.h> +#include <ngx_log.h> +#include <ngx_recv.h> +#include <ngx_connection.h> + +#if (HAVE_KQUEUE) +#include <ngx_kqueue_module.h> +#endif + + +/* + The data is ready - 3 syscalls: + aio_write(), aio_error(), aio_return() + The data is not ready - 4 (kqueue) or 5 syscalls: + aio_write(), aio_error(), notifiction, + aio_error(), aio_return() + aio_cancel(), aio_error() +*/ + +ssize_t ngx_event_aio_write(ngx_connection_t *c, char *buf, size_t size) +{ + int rc, first, canceled; + ngx_event_t *ev; + + ev = c->write; + + canceled = 0; + +ngx_log_debug(ev->log, "aio: ev->ready: %d" _ ev->ready); +ngx_log_debug(ev->log, "aio: aiocb: %08x" _ &ev->aiocb); + + if (ev->timedout) { + ngx_set_socket_errno(NGX_ETIMEDOUT); + ngx_log_error(NGX_LOG_ERR, ev->log, 0, "aio_write() timed out"); + + rc = aio_cancel(c->fd, &ev->aiocb); + if (rc == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, + "aio_cancel() failed"); + return NGX_ERROR; + } + + ngx_log_debug(ev->log, "aio_cancel: %d" _ rc); + + canceled = 1; + + ev->ready = 1; + } + + first = 0; + + if (!ev->ready) { + ngx_memzero(&ev->aiocb, sizeof(struct aiocb)); + + ev->aiocb.aio_fildes = c->fd; + ev->aiocb.aio_buf = buf; + ev->aiocb.aio_nbytes = size; + +#if (HAVE_KQUEUE) + ev->aiocb.aio_sigevent.sigev_notify_kqueue = kq; + ev->aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + ev->aiocb.aio_sigevent.sigev_value.sigval_ptr = ev; +#endif + + if (aio_write(&ev->aiocb) == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, + "aio_write() failed"); + return NGX_ERROR; + } + + ngx_log_debug(ev->log, "aio_write: OK"); + + ev->active = 1; + first = 1; + } + + ev->ready = 0; + + rc = aio_error(&ev->aiocb); + if (rc == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_error() failed"); + return NGX_ERROR; + } + + if (rc != 0) { + if (rc == NGX_EINPROGRESS) { + if (!first) { + ngx_log_error(NGX_LOG_CRIT, ev->log, rc, + "aio_write() still in progress"); + } + return NGX_AGAIN; + } + + if (rc == NGX_ECANCELED && canceled) { + return NGX_ERROR; + } + + ngx_log_error(NGX_LOG_CRIT, ev->log, rc, "aio_write() failed"); + return NGX_ERROR; + } + + rc = aio_return(&ev->aiocb); + if (rc == -1) { + ngx_log_error(NGX_LOG_CRIT, ev->log, ngx_errno, "aio_return() failed"); + + return NGX_ERROR; + } + + ngx_log_debug(ev->log, "aio_write: %d" _ rc); + + return rc; +} diff --git a/src/event/ngx_event_close.c b/src/event/ngx_event_close.c index c829c7172..8c8fd1bef 100644 --- a/src/event/ngx_event_close.c +++ b/src/event/ngx_event_close.c @@ -27,7 +27,7 @@ int ngx_event_close_connection(ngx_event_t *ev) ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); if ((rc = ngx_close_socket(c->fd)) == -1) - ngx_log_error(NGX_LOG_ERR, c->log, ngx_socket_errno, + ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno, "ngx_event_close: close failed"); c->fd = -1; diff --git a/src/event/ngx_event_recv.c b/src/event/ngx_event_recv.c index 4b01a4c68..4fa95c3ae 100644 --- a/src/event/ngx_event_recv.c +++ b/src/event/ngx_event_recv.c @@ -6,7 +6,7 @@ #include <ngx_recv.h> #include <ngx_connection.h> -int ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size) +ssize_t ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size) { int n; ngx_err_t err; @@ -20,9 +20,23 @@ int ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size) #if (HAVE_KQUEUE) ngx_log_debug(c->log, "ngx_event_recv: eof:%d, avail:%d, err:%d" _ c->read->eof _ c->read->available _ c->read->error); -#if !(USE_KQUEUE) - if (ngx_event_type == NGX_KQUEUE_EVENT) #endif + +#if (USE_KQUEUE) + + if (c->read->eof && c->read->available == 0) { + if (c->read->error) { + ngx_log_error(NGX_LOG_ERR, c->log, c->read->error, + "recv() failed"); + return NGX_ERROR; + } + + return 0; + } + +#elif (HAVE_KQUEUE) + + if (ngx_event_type == NGX_KQUEUE_EVENT) { if (c->read->eof && c->read->available == 0) { if (c->read->error) { ngx_log_error(NGX_LOG_ERR, c->log, c->read->error, @@ -32,6 +46,8 @@ int ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size) return 0; } + } + #endif n = ngx_recv(c->fd, buf, size, 0); @@ -48,11 +64,16 @@ int ngx_event_recv_core(ngx_connection_t *c, char *buf, size_t size) return NGX_ERROR; } -#if (HAVE_KQUEUE) -#if !(USE_KQUEUE) - if (ngx_event_type == NGX_KQUEUE_EVENT) -#endif +#if (USE_KQUEUE) + + c->read->available -= n; + +#elif (HAVE_KQUEUE) + + if (ngx_event_type == NGX_KQUEUE_EVENT) { c->read->available -= n; + } + #endif return n; diff --git a/src/event/ngx_event_timer.h b/src/event/ngx_event_timer.h index 7f1816aa6..227f702c5 100644 --- a/src/event/ngx_event_timer.h +++ b/src/event/ngx_event_timer.h @@ -15,8 +15,6 @@ int ngx_event_find_timer(void); void ngx_event_expire_timers(ngx_msec_t timer); -extern ngx_event_t *ngx_timer_queue; - ngx_inline static void ngx_event_del_timer(ngx_event_t *ev) { @@ -41,4 +39,4 @@ ngx_inline static void ngx_event_del_timer(ngx_event_t *ev) } -#endif _NGX_EVENT_TIMER_H_INCLUDED_ +#endif /* _NGX_EVENT_TIMER_H_INCLUDED_ */ diff --git a/src/event/ngx_event_write.c b/src/event/ngx_event_write.c index 35949ec39..9e7670c5d 100644 --- a/src/event/ngx_event_write.c +++ b/src/event/ngx_event_write.c @@ -11,11 +11,11 @@ #include <ngx_event_write.h> -ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, - off_t flush) +ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, off_t flush) { - int rc; - char *last; + int rc, i, last; + u_int flags; + char *prev; off_t sent; ngx_iovec_t *iov; ngx_array_t *header, *trailer; @@ -24,6 +24,7 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, ch = in; file = NULL; + last = 0; ngx_test_null(header, ngx_create_array(c->pool, 10, sizeof(ngx_iovec_t)), (ngx_chain_t *) -1); @@ -36,12 +37,12 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, trailer->nelts = 0; if (ch->hunk->type & NGX_HUNK_IN_MEMORY) { - last = NULL; + prev = NULL; iov = NULL; while (ch && (ch->hunk->type & NGX_HUNK_IN_MEMORY)) { - if (last == ch->hunk->pos.mem) { + if (prev == ch->hunk->pos.mem) { iov->ngx_iov_len += ch->hunk->last.mem - ch->hunk->pos.mem; } else { @@ -49,7 +50,11 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, (ngx_chain_t *) -1); iov->ngx_iov_base = ch->hunk->pos.mem; iov->ngx_iov_len = ch->hunk->last.mem - ch->hunk->pos.mem; - last = ch->hunk->last.mem; + prev = ch->hunk->last.mem; + } + + if (ch->hunk->type & NGX_HUNK_LAST) { + last = 1; } ch = ch->next; @@ -59,6 +64,10 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, if (ch && (ch->hunk->type & NGX_HUNK_FILE)) { file = ch->hunk; ch = ch->next; + + if (ch->hunk->type & NGX_HUNK_LAST) { + last = 1; + } } #if (HAVE_MAX_SENDFILE_IOVEC) @@ -68,12 +77,12 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, } else { #endif if (ch && ch->hunk->type & NGX_HUNK_IN_MEMORY) { - last = NULL; + prev = NULL; iov = NULL; while (ch && (ch->hunk->type & NGX_HUNK_IN_MEMORY)) { - if (last == ch->hunk->pos.mem) { + if (prev == ch->hunk->pos.mem) { iov->ngx_iov_len += ch->hunk->last.mem - ch->hunk->pos.mem; @@ -83,7 +92,11 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, iov->ngx_iov_base = ch->hunk->pos.mem; iov->ngx_iov_len = ch->hunk->last.mem - ch->hunk->pos.mem; - last = ch->hunk->last.mem; + prev = ch->hunk->last.mem; + } + + if (ch->hunk->type & NGX_HUNK_LAST) { + last = 1; } ch = ch->next; @@ -91,19 +104,47 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, } if (file) { - rc = ngx_sendfile(c->fd, + flags = ngx_sendfile_flags; +#if (HAVE_SENDFILE_DISCONNECT) + if (last && c->close) { + flags |= HAVE_SENDFILE_DISCONNECT; + } +#endif + rc = ngx_sendfile(c, (ngx_iovec_t *) header->elts, header->nelts, file->file->fd, file->pos.file, (size_t) (file->last.file - file->pos.file), (ngx_iovec_t *) trailer->elts, trailer->nelts, - &sent, c->log); + &sent, flags); + +#if (HAVE_AIO_EVENT) && !(HAVE_IOCP_EVENT) + } else if (ngx_event_flags & NGX_HAVE_AIO_EVENT) { + + sent = 0; + rc = NGX_AGAIN; + iov = (ngx_iovec_t *) header->elts; + for (i = 0; i < header->nelts; i++) { + rc = ngx_event_aio_write(c, iov[i].ngx_iov_base, + iov[i].ngx_iov_len); + + if (rc > 0) { + sent += rc; + } else { + break; + } + + if (rc < (int) iov->ngx_iov_len) { + break; + } + } +#endif } else { rc = ngx_sendv(c, (ngx_iovec_t *) header->elts, header->nelts); sent = rc > 0 ? rc: 0; #if (NGX_DEBUG_EVENT_WRITE) - ngx_log_debug(c->log, "sendv: " QD_FMT _ sent); + ngx_log_debug(c->log, "sendv: " OFF_FMT _ sent); #endif } #if (HAVE_MAX_SENDFILE_IOVEC) @@ -118,7 +159,7 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, for (ch = in; ch; ch = ch->next) { #if (NGX_DEBUG_EVENT_WRITE) - ngx_log_debug(c->log, "event write: %x " QX_FMT " " QD_FMT _ + ngx_log_debug(c->log, "event write: %x " QX_FMT " " OFF_FMT _ ch->hunk->type _ ch->hunk->pos.file _ ch->hunk->last.file - ch->hunk->pos.file); @@ -129,7 +170,7 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, ch->hunk->pos.file = ch->hunk->last.file; #if (NGX_DEBUG_EVENT_WRITE) - ngx_log_debug(c->log, "event write: " QX_FMT " 0 " QD_FMT _ + ngx_log_debug(c->log, "event write: " QX_FMT " 0 " OFF_FMT _ ch->hunk->pos.file _ sent); #endif @@ -144,7 +185,7 @@ ngx_chain_t *ngx_event_write(ngx_connection_t *c, ngx_chain_t *in, ch->hunk->pos.file += sent; #if (NGX_DEBUG_EVENT_WRITE) - ngx_log_debug(c->log, "event write: " QX_FMT " " QD_FMT _ + ngx_log_debug(c->log, "event write: " QX_FMT " " OFF_FMT _ ch->hunk->pos.file _ ch->hunk->last.file - ch->hunk->pos.file); #endif diff --git a/src/event/ngx_event_wsarecv.c b/src/event/ngx_event_wsarecv.c new file mode 100644 index 000000000..20c0c1bd9 --- /dev/null +++ b/src/event/ngx_event_wsarecv.c @@ -0,0 +1,97 @@ + +#include <ngx_config.h> + +#include <ngx_core.h> +#include <ngx_errno.h> +#include <ngx_log.h> +#include <ngx_connection.h> +#include <ngx_event.h> + + +ssize_t ngx_event_wsarecv(ngx_connection_t *c, char *buf, size_t size) +{ + int rc; + u_int flags; + size_t bytes; + ngx_err_t err; + WSABUF wsabuf[1]; + ngx_event_t *ev; + LPWSAOVERLAPPED_COMPLETION_ROUTINE handler; + + ev = c->read; + +/* DEBUG */ bytes = 0; + + if (ev->timedout) { + ngx_set_socket_errno(NGX_ETIMEDOUT); + ngx_log_error(NGX_LOG_ERR, ev->log, 0, "WSARecv() timed out"); + + return NGX_ERROR; + } + + if (ev->ready) { + ev->ready = 0; + +#if (HAVE_IOCP_EVENT) /* iocp */ + + if (ngx_event_flags & NGX_HAVE_IOCP_EVENT) { + if (ev->ovlp.error) { + ngx_log_error(NGX_LOG_ERR, c->log, ev->ovlp.error, + "WSARecv() failed"); + return NGX_ERROR; + } + + return ev->available; + } + +#endif + + if (WSAGetOverlappedResult(c->fd, (LPWSAOVERLAPPED) &ev->ovlp, + &bytes, 0, NULL) == 0) { + err = ngx_socket_errno; + ngx_log_error(NGX_LOG_CRIT, ev->log, err, + "WSARecv() or WSAGetOverlappedResult() failed"); + + return NGX_ERROR; + } + + return bytes; + } + + ngx_memzero(&ev->ovlp, sizeof(WSAOVERLAPPED)); + wsabuf[0].buf = buf; + wsabuf[0].len = size; + flags = 0; + +#if 0 + handler = ev->handler; +#else + handler = NULL; +#endif + + rc = WSARecv(c->fd, wsabuf, 1, &bytes, &flags, + (LPWSAOVERLAPPED) &ev->ovlp, handler); + + ngx_log_debug(ev->log, "WSARecv: %d:%d" _ rc _ bytes); + + if (rc == -1) { + err = ngx_socket_errno; + if (err == WSA_IO_PENDING) { + return NGX_AGAIN; + + } else { + ngx_log_error(NGX_LOG_CRIT, ev->log, err, "WSARecv() failed"); + return NGX_ERROR; + } + } + +#if (HAVE_IOCP_EVENT) /* iocp */ + + if (ngx_event_flags & NGX_HAVE_IOCP_EVENT) { + return NGX_AGAIN; + } + +#endif + + return bytes; +} |
