summaryrefslogtreecommitdiffhomepage
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/modules/ngx_iocp_module.c15
-rw-r--r--src/event/modules/ngx_kqueue_module.c2
-rw-r--r--src/event/modules/ngx_select_module.c10
-rw-r--r--src/event/ngx_event.h1
-rw-r--r--src/event/ngx_event_connect.c10
-rw-r--r--src/event/ngx_event_connectex.c200
-rw-r--r--src/event/ngx_event_pipe.c2
7 files changed, 225 insertions, 15 deletions
diff --git a/src/event/modules/ngx_iocp_module.c b/src/event/modules/ngx_iocp_module.c
index 7b89227a3..9612c9ab9 100644
--- a/src/event/modules/ngx_iocp_module.c
+++ b/src/event/modules/ngx_iocp_module.c
@@ -110,7 +110,7 @@ static int ngx_iocp_init(ngx_cycle_t *cycle)
return NGX_ERROR;
}
- ngx_io = ngx_os_io;
+ ngx_io = ngx_iocp_io;
ngx_event_actions = ngx_iocp_module_ctx.actions;
@@ -233,16 +233,19 @@ static int ngx_iocp_process_events(ngx_log_t *log)
ngx_log_debug(log, "iocp ev: %08x" _ ev);
switch (key) {
- case NGX_IOCP_IO:
- ev->complete = 1;
- ev->ready = 1;
- break;
-
case NGX_IOCP_ACCEPT:
if (bytes) {
ev->ready = 1;
}
break;
+
+ case NGX_IOCP_IO:
+ ev->complete = 1;
+ ev->ready = 1;
+ break;
+
+ case NGX_IOCP_CONNECT:
+ ev->ready = 1;
}
ev->available = bytes;
diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c
index d43fb0e15..1d43ba10e 100644
--- a/src/event/modules/ngx_kqueue_module.c
+++ b/src/event/modules/ngx_kqueue_module.c
@@ -460,7 +460,7 @@ static int ngx_kqueue_process_events(ngx_log_t *log)
default:
ngx_log_error(NGX_LOG_ALERT, log, 0,
- "unknown kevent filter %d" _ event_list[i].filter);
+ "unexpected kevent filter %d" _ event_list[i].filter);
}
}
diff --git a/src/event/modules/ngx_select_module.c b/src/event/modules/ngx_select_module.c
index 1cde073a1..9e7a08f94 100644
--- a/src/event/modules/ngx_select_module.c
+++ b/src/event/modules/ngx_select_module.c
@@ -265,9 +265,6 @@ static int ngx_select_process_events(ngx_log_t *log)
timer = ngx_event_find_timer();
if (timer) {
- tv.tv_sec = timer / 1000;
- tv.tv_usec = (timer % 1000) * 1000;
- tp = &tv;
#if (HAVE_SELECT_CHANGE_TIMEOUT)
delta = 0;
#else
@@ -275,10 +272,13 @@ static int ngx_select_process_events(ngx_log_t *log)
delta = tv.tv_sec * 1000 + tv.tv_usec / 1000;
#endif
+ tv.tv_sec = timer / 1000;
+ tv.tv_usec = (timer % 1000) * 1000;
+ tp = &tv;
+
} else {
- timer = 0;
- tp = NULL;
delta = 0;
+ tp = NULL;
}
#if !(WIN32)
diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h
index 0e020db15..83ca9762c 100644
--- a/src/event/ngx_event.h
+++ b/src/event/ngx_event.h
@@ -277,6 +277,7 @@ extern ngx_event_actions_t ngx_event_actions;
#if (HAVE_IOCP_EVENT)
#define NGX_IOCP_ACCEPT 0
#define NGX_IOCP_IO 1
+#define NGX_IOCP_CONNECT 2
#endif
diff --git a/src/event/ngx_event_connect.c b/src/event/ngx_event_connect.c
index 8fbc51bd8..296ec7c06 100644
--- a/src/event/ngx_event_connect.c
+++ b/src/event/ngx_event_connect.c
@@ -180,8 +180,8 @@ int ngx_event_connect_peer(ngx_peer_connection_t *pc)
ngx_memzero(&addr, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = peer->addr;
addr.sin_port = peer->port;
+ addr.sin_addr.s_addr = peer->addr;
ngx_log_debug(pc->log, "CONNECT: %s" _ peer->addr_port_text.data);
@@ -189,7 +189,10 @@ ngx_log_debug(pc->log, "CONNECT: %s" _ peer->addr_port_text.data);
if (rc == -1) {
err = ngx_socket_errno;
- if (err != NGX_EINPROGRESS) {
+
+ /* Winsock returns WSAEWOULDBLOCK */
+
+ if (err != NGX_EINPROGRESS && err != NGX_EAGAIN) {
ngx_log_error(NGX_LOG_ERR, pc->log, err, "connect() failed");
if (ngx_close_socket(s) == -1) {
@@ -221,7 +224,8 @@ ngx_log_debug(pc->log, "CONNECT: %s" _ peer->addr_port_text.data);
/*
* aio allows to post operation on non-connected socket
- * at least in FreeBSD
+ * at least in FreeBSD.
+ * NT does not support it.
*
* TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT
*/
diff --git a/src/event/ngx_event_connectex.c b/src/event/ngx_event_connectex.c
new file mode 100644
index 000000000..79833c647
--- /dev/null
+++ b/src/event/ngx_event_connectex.c
@@ -0,0 +1,200 @@
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+
+
+#define NGX_MAX_PENDING_CONN 10
+
+
+static CRITICAL_SECTION connect_lock;
+static int nconnects;
+static ngx_connection_t pending_connects[NGX_MAX_PENDING_CONN];
+
+static HANDLE pending_connect_event;
+
+__declspec(thread) int nevents = 0;
+__declspec(thread) WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
+__declspec(thread) ngx_connection_t *conn[WSA_MAXIMUM_WAIT_EVENTS];
+
+
+
+int ngx_iocp_wait_connect(ngx_connection_t *c)
+{
+ for ( ;; ) {
+ EnterCriticalSection(&connect_lock);
+
+ if (nconnects < NGX_MAX_PENDING_CONN) {
+ pending_connects[--nconnects] = c;
+ LeaveCriticalSection(&connect_lock);
+
+ if (SetEvent(pending_connect_event) == 0) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
+ "SetEvent() failed");
+ return NGX_ERROR;
+
+ break;
+ }
+
+ LeaveCriticalSection(&connect_lock);
+ ngx_log_error(NGX_LOG_NOTICE, c->log, 0,
+ "max number of pending connect()s is %d",
+ NGX_MAX_PENDING_CONN);
+ msleep(100);
+ }
+
+ if (!started) {
+ if (ngx_iocp_new_thread(1) == NGX_ERROR) {
+ return NGX_ERROR;
+ }
+ started = 1;
+ }
+
+ return NGX_OK;
+}
+
+
+int ngx_iocp_new_thread(int main)
+{
+ u_int id;
+
+ if (main) {
+ pending_connect_event = CreateEvent(NULL, 0, 1, NULL);
+ if (pending_connect_event == INVALID_HANDLE_VALUE) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
+ "CreateThread() failed");
+ return NGX_ERROR;
+ }
+ }
+
+ if (CreateThread(NULL, 0, ngx_iocp_wait_events, main, 0, &id)
+ == INVALID_HANDLE_VALUE)
+ {
+ ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
+ "CreateThread() failed");
+ return NGX_ERROR;
+ }
+
+ SetEvent(event) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
+ "SetEvent() failed");
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
+int ngx_iocp_new_connect()
+{
+ EnterCriticalSection(&connect_lock);
+ c = pending_connects[--nconnects];
+ LeaveCriticalSection(&connect_lock);
+
+ conn[nevents] = c;
+
+ events[nevents] = WSACreateEvent();
+ if (events[nevents] == INVALID_HANDLE_VALUE) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno,
+ "WSACreateEvent() failed");
+ return NGX_ERROR;
+ }
+
+ if (WSAEventSelect(c->fd, events[nevents], FD_CONNECT) == -1)
+ ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno,
+ "WSAEventSelect() failed");
+ return NGX_ERROR;
+ }
+
+ nevents++;
+
+ return NGX_OK;
+}
+
+
+void ngx_iocp_wait_events(int main)
+{
+ WSANETWORKEVENTS ne;
+
+ nevents = 1;
+ events[0] = pending_connect_event;
+ conn[0] = NULL;
+
+ for ( ;; ) {
+ offset = (nevents == WSA_MAXIMUM_WAIT_EVENTS) ? 1: 0;
+ timeout = (nevents == 1 && !first) ? 60000: INFINITE;
+
+ n = WSAWaitForMultipleEvents(nevents - offset, events[offset],
+ 0, timeout, 0);
+ if (n == WAIT_FAILED) {
+ ngx_log_error(NGX_LOG_ALERT, log, ngx_socket_errno,
+ "WSAWaitForMultipleEvents() failed");
+ continue;
+ }
+
+ if (n == WAIT_TIMEOUT) {
+ if (nevents == 1 && !main) {
+ ExitThread(0);
+ }
+
+ ngx_log_error(NGX_LOG_ALERT, log, 0,
+ "WSAWaitForMultipleEvents() "
+ "returned unexpected WAIT_TIMEOUT");
+ continue;
+ }
+
+ n -= WSA_WAIT_EVENT_0;
+
+ if (n == 0) {
+
+ /* the first event is pending_connect_event */
+
+ if (nevents == WSA_MAXIMUM_WAIT_EVENTS) {
+ ngx_iocp_new_thread(0);
+ } else {
+ ngx_iocp_new_connect();
+ }
+
+ continue;
+ }
+
+ if (WSAEnumNetworkEvents(c[n].fd, events[n], &ne) == -1) {
+ ngx_log_error(NGX_LOG_ALERT, log, ngx_socket_errno,
+ "WSAEnumNetworkEvents() failed");
+ continue;
+ }
+
+ if (ne.lNetworkEvents & FD_CONNECT) {
+ conn[n].write->ovlp.error = ne.iErrorCode[FD_CONNECT_BIT];
+
+ if (PostQueuedCompletionStatus(iocp, 0, NGX_IOCP_CONNECT,
+ &conn[n].write->ovlp) == 0)
+ {
+ ngx_log_error(NGX_LOG_ALERT, log, ngx_socket_errno,
+ "PostQueuedCompletionStatus() failed");
+ continue;
+ }
+
+ if (n < nevents) {
+ conn[n] = conn[nevents];
+ events[n] = events[nevents];
+ }
+
+ nevents--;
+ continue;
+ }
+
+ if (ne.lNetworkEvents & FD_ACCEPT) {
+
+ /* CHECK ERROR ??? */
+
+ ngx_event_post_acceptex(conn[n].listening, 1);
+ continue;
+ }
+
+ ngx_log_error(NGX_LOG_ALERT, c[n].log, 0,
+ "WSAWaitForMultipleEvents() "
+ "returned unexpected network event %lu",
+ ne.lNetworkEvents);
+ }
+}
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index af68d4528..2dcb6631d 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -566,6 +566,8 @@ int ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_hunk_t *hunk)
ngx_hunk_t *h;
ngx_chain_t *cl;
+ngx_log_debug(p->log, "COPY");
+
if (hunk->pos == hunk->last) {
return NGX_OK;
}