diff options
Diffstat (limited to '')
| -rw-r--r-- | src/event/modules/ngx_aio_module.c | 37 | ||||
| -rw-r--r-- | src/event/modules/ngx_kqueue_module.c | 240 | ||||
| -rw-r--r-- | src/event/modules/ngx_kqueue_module.h | 16 | ||||
| -rw-r--r-- | src/event/modules/ngx_overlapped_module.c | 17 | ||||
| -rw-r--r-- | src/event/modules/ngx_select_module.c | 349 | ||||
| -rw-r--r-- | src/event/modules/ngx_select_module.h | 16 | ||||
| -rw-r--r-- | src/event/ngx_event.c | 87 | ||||
| -rw-r--r-- | src/event/ngx_event.h | 127 | ||||
| -rw-r--r-- | src/event/ngx_event_accept.c | 83 | ||||
| -rw-r--r-- | src/event/ngx_event_accept.h | 10 | ||||
| -rw-r--r-- | src/event/ngx_event_close.c | 29 | ||||
| -rw-r--r-- | src/event/ngx_event_close.h | 10 | ||||
| -rw-r--r-- | src/event/ngx_event_read.c | 55 | ||||
| -rw-r--r-- | src/event/ngx_event_write.c | 136 | ||||
| -rw-r--r-- | src/event/ngx_event_write.h | 13 |
15 files changed, 1225 insertions, 0 deletions
diff --git a/src/event/modules/ngx_aio_module.c b/src/event/modules/ngx_aio_module.c new file mode 100644 index 000000000..939c5c07e --- /dev/null +++ b/src/event/modules/ngx_aio_module.c @@ -0,0 +1,37 @@ + +int ngx_posix_aio_process_events(ngx_log_t *log) +{ + unmask signal + + listen via signal; + + aio_suspend()/aiowait()/aio_waitcomplete(); + + mask signal + + if (ngx_socket_errno == NGX_EINTR) + look listen + select()/accept() nb listen sockets + else + aio +} + +int ngx_posix_aio_process_events(ngx_log_t *log) +{ + unmask signal + + /* BUG: signal can be delivered before select() */ + + select(listen); + + mask signal + + if (ngx_socket_errno == NGX_EINTR) + look ready array +} + +void aio_sig_handler(int signo, siginfo_t *siginfo, void *context) +{ + push siginfo->si_value.sival_ptr +} + diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c new file mode 100644 index 000000000..54e9474ab --- /dev/null +++ b/src/event/modules/ngx_kqueue_module.c @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2002 Igor Sysoev, http://sysoev.ru + */ + +/* + NEED ? : unify change_list and event_list: + event_list = change_list; +*/ + +#include <ngx_config.h> +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_connection.h> +#include <ngx_event.h> +#include <ngx_kqueue_module.h> + +#if (USE_KQUEUE) && !(HAVE_KQUEUE) +#error "kqueue is not supported on this platform" +#endif + +static void ngx_add_timer(ngx_event_t *ev, u_int timer); +static void ngx_inline ngx_del_timer(ngx_event_t *ev); + + +static int kq; +static struct kevent *change_list, *event_list; +static int nchanges, nevents; + +static ngx_event_t timer_queue; + +void ngx_kqueue_init(int max_connections, ngx_log_t *log) +{ + int size = sizeof(struct kevent) * 512; + + nchanges = 0; + nevents = 512; + + if ((kq = kqueue()) == -1) + ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, + "ngx_kqueue_init: kqueue failed"); + + change_list = ngx_alloc(size, log); + event_list = ngx_alloc(size, log); + + timer_queue.timer_prev = &timer_queue; + timer_queue.timer_next = &timer_queue; + +#if !(USE_KQUEUE) + ngx_event_actions.add = ngx_kqueue_add_event; + ngx_event_actions.del = ngx_kqueue_del_event; + ngx_event_actions.process = ngx_kqueue_process_events; +#endif + +} + +int ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags) +{ + if (event == NGX_TIMER_EVENT) { + ngx_add_timer(ev, flags); + return 0; + } + + return ngx_kqueue_set_event(ev, event, EV_ADD | flags); +} + +int ngx_kqueue_del_event(ngx_event_t *ev, int event) +{ + if (event == NGX_TIMER_EVENT) { + ngx_del_timer(ev); + return 0; + } + + return ngx_kqueue_set_event(ev, event, EV_DELETE); +} + +int ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags) +{ + struct timespec ts = { 0, 0 }; + ngx_connection_t *cn = (ngx_connection_t *) ev->data; + + ngx_log_debug(ev->log, "ngx_kqueue_set_event: %d: ft:%d f:%08x" _ + cn->fd _ filter _ flags); + + if (nchanges >= nevents) { + ngx_log_error(NGX_LOG_WARN, ev->log, 0, + "ngx_kqueue_set_event: change list is filled up"); + + if (kevent(kq, change_list, nchanges, NULL, 0, &ts) == -1) { + ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, + "ngx_kqueue_set_event: kevent failed"); + return -1; + } + nchanges = 0; + } + + change_list[nchanges].ident = cn->fd; + change_list[nchanges].filter = filter; + change_list[nchanges].flags = flags; + change_list[nchanges].fflags = 0; + change_list[nchanges].data = 0; + change_list[nchanges].udata = ev; + nchanges++; + + return 0; +} + +int ngx_kqueue_process_events(ngx_log_t *log) +{ + int events, i; + u_int timer = 0, delta = 0; + ngx_event_t *ev, *nx; + struct timeval tv; + struct timespec ts, *tp = NULL; + + if (timer_queue.timer_next != &timer_queue) { + timer = timer_queue.timer_next->timer_delta; + ts.tv_sec = timer / 1000; + ts.tv_nsec = (timer % 1000) * 1000000; + tp = &ts; + gettimeofday(&tv, NULL); + delta = tv.tv_sec * 1000 + tv.tv_usec / 1000; + } + + ngx_log_debug(log, "ngx_kqueue_process_events: timer: %d" _ timer); + + if ((events = kevent(kq, change_list, nchanges, event_list, nevents, tp)) + == -1) { + ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, + "ngx_kqueue_process_events: kevent failed"); + return -1; + } + + nchanges = 0; + + if (timer) { + gettimeofday(&tv, NULL); + delta = tv.tv_sec * 1000 + tv.tv_usec / 1000 - delta; + + } else { + ngx_assert((events != 0), return -1, log, + "ngx_kqueue_process_events: " + "kevent returns no events without timeout"); + } + + ngx_log_debug(log, "ngx_kqueue_process_events: " + "timer: %d, delta: %d" _ timer _ delta); + + if (timer) { + if (delta >= timer) { + for (ev = timer_queue.timer_next; + ev != &timer_queue && delta >= ev->timer_delta; + /* void */) + { + delta -= ev->timer_delta; + nx = ev->timer_next; + ngx_del_timer(ev); + if (ev->timer_handler(ev) == -1) + ev->close_handler(ev); + ev = nx; + } + + } else { + timer_queue.timer_next->timer_delta -= delta; + } + } + + for (i = 0; i < events; i++) { + + ngx_log_debug(log, "ngx_kqueue_process_events: kevent: " + "%d: ft:%d f:%08x ff:%08x d:%d ud:%08x" _ + event_list[i].ident _ event_list[i].filter _ + event_list[i].flags _ event_list[i].fflags _ + event_list[i].data _ event_list[i].udata); + + if (event_list[i].flags & EV_ERROR) { + ngx_log_error(NGX_LOG_ALERT, log, event_list[i].data, + "ngx_kqueue_process_events: kevent error"); + continue; + } + + ev = (ngx_event_t *) event_list[i].udata; + + switch (event_list[i].filter) { + + case EVFILT_READ: + case EVFILT_WRITE: + ev->ready = 1; + ev->available = event_list[i].data; + + if (event_list[i].flags & EV_EOF) { + ev->eof = 1; + ev->error = event_list[i].fflags; + } + + if (ev->event_handler(ev) == -1) + ev->close_handler(ev); + + break; + + default: + ngx_assert(0, /* void */, log, + "ngx_kqueue_process_events: unknown filter %d" _ + event_list[i].filter); + } + } + + return 0; +} + +static void ngx_add_timer(ngx_event_t *ev, u_int timer) +{ + ngx_event_t *e; + + for (e = timer_queue.timer_next; + e != &timer_queue && timer > e->timer_delta; + e = e->timer_next) + timer -= e->timer_delta; + + ev->timer_delta = timer; + + ev->timer_next = e; + ev->timer_prev = e->timer_prev; + + e->timer_prev->timer_next = ev; + e->timer_prev = ev; +} + +static void ngx_inline ngx_del_timer(ngx_event_t *ev) +{ + if (ev->timer_prev) + ev->timer_prev->timer_next = ev->timer_next; + + if (ev->timer_next) { + ev->timer_next->timer_prev = ev->timer_prev; + ev->timer_prev = NULL; + } + + if (ev->timer_prev) + ev->timer_next = NULL; +} diff --git a/src/event/modules/ngx_kqueue_module.h b/src/event/modules/ngx_kqueue_module.h new file mode 100644 index 000000000..38509a083 --- /dev/null +++ b/src/event/modules/ngx_kqueue_module.h @@ -0,0 +1,16 @@ +#ifndef _NGX_KQUEUE_MODULE_H_INCLUDED_ +#define _NGX_KQUEUE_MODULE_H_INCLUDED_ + + +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_event.h> + +void ngx_kqueue_init(int max_connections, ngx_log_t *log); +int ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags); +int ngx_kqueue_del_event(ngx_event_t *ev, int event); +int ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags); +int ngx_kqueue_process_events(ngx_log_t *log); + + +#endif /* _NGX_KQUEUE_MODULE_H_INCLUDED_ */ diff --git a/src/event/modules/ngx_overlapped_module.c b/src/event/modules/ngx_overlapped_module.c new file mode 100644 index 000000000..2bb4d7f69 --- /dev/null +++ b/src/event/modules/ngx_overlapped_module.c @@ -0,0 +1,17 @@ + +int ngx_overlapped_process_events(ngx_log_t *log) +{ + if (acceptex) + event = SleepEx(timer, 1); + else + event = WSAWaitForMultipleEvents(n_events, events, 0, timer, 1); + + if (event == WSA_IO_COMPLETION) + look ready array +} + +void CALLBACK overlapped_completion_procedure(DWORD error, DWORD nbytes, + LPWSAOVERLAPPED overlapped, DWORD flags) +{ + push overlapped; +} diff --git a/src/event/modules/ngx_select_module.c b/src/event/modules/ngx_select_module.c new file mode 100644 index 000000000..6c9e8f3c2 --- /dev/null +++ b/src/event/modules/ngx_select_module.c @@ -0,0 +1,349 @@ + +#include <ngx_config.h> +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_time.h> +#include <ngx_connection.h> +#include <ngx_event.h> +#include <ngx_select_module.h> + +static fd_set master_read_fds; +static fd_set master_write_fds; +static fd_set work_read_fds; +static fd_set work_write_fds; + +#if (WIN32) +static int max_read; +static int max_write; +#else +static int max_fd; +#endif + +static ngx_event_t event_queue; +static ngx_event_t timer_queue; + + +static void ngx_add_timer(ngx_event_t *ev, u_int timer); +static void ngx_inline ngx_del_timer(ngx_event_t *ev); + +static fd_set *ngx_select_get_fd_set(ngx_socket_t fd, int event, + ngx_log_t *log); + +void ngx_select_init(int max_connections, ngx_log_t *log) +{ +#if (WIN32) + if (max_connections > FD_SETSIZE) + ngx_log_error(NGX_LOG_EMERG, log, 0, + "ngx_select_init: maximum number of descriptors " + "supported by select() is %d", + FD_SETSIZE); +#else + if (max_connections >= FD_SETSIZE) + ngx_log_error(NGX_LOG_EMERG, log, 0, + "ngx_select_init: maximum descriptor number" + "supported by select() is %d", + FD_SETSIZE - 1); +#endif + + FD_ZERO(&master_read_fds); + FD_ZERO(&master_write_fds); + + event_queue.prev = &event_queue; + event_queue.next = &event_queue; + + timer_queue.timer_prev = &timer_queue; + timer_queue.timer_next = &timer_queue; + + ngx_event_actions.add = ngx_select_add_event; + ngx_event_actions.del = ngx_select_del_event; + ngx_event_actions.process = ngx_select_process_events; + +#if (WIN32) + max_read = max_write = 0; +#else + max_fd = -1; +#endif +} + +int ngx_select_add_event(ngx_event_t *ev, int event, u_int flags) +{ + fd_set *fds; + ngx_connection_t *cn = (ngx_connection_t *) ev->data; + + if (event == NGX_TIMER_EVENT) { + ngx_add_timer(ev, flags); + return 0; + } + + ngx_assert((flags != NGX_ONESHOT_EVENT), return -1, ev->log, + "ngx_select_add_event: NGX_ONESHOT_EVENT is not supported"); + + fds = ngx_select_get_fd_set(cn->fd, event, ev->log); + if (fds == NULL) + return -1; + + ev->prev = &event_queue; + ev->next = event_queue.next; + event_queue.next->prev = ev; + event_queue.next = ev; + + FD_SET(cn->fd, fds); + +#if (WIN32) + switch (event) { + case NGX_READ_EVENT: + max_read++; + break; + case NGX_WRITE_EVENT: + max_write++; + break; + } +#else + if (max_fd != -1 && max_fd < cn->fd) + max_fd = cn->fd; +#endif + + return 0; +} + +int ngx_select_del_event(ngx_event_t *ev, int event) +{ + fd_set *fds; + ngx_connection_t *cn = (ngx_connection_t *) ev->data; + + if (event == NGX_TIMER_EVENT) { + ngx_del_timer(ev); + return 0; + } + + fds = ngx_select_get_fd_set(cn->fd, event, ev->log); + if (fds == NULL) + return -1; + + if (ev->prev) + ev->prev->next = ev->next; + + if (ev->next) { + ev->next->prev = ev->prev; + ev->prev = NULL; + } + + if (ev->prev) + ev->next = NULL; + + FD_CLR(cn->fd, fds); + +#if (WIN32) + switch (event) { + case NGX_READ_EVENT: + max_read--; + break; + case NGX_WRITE_EVENT: + max_write--; + break; + } +#else + if (max_fd == cn->fd) + max_fd = -1; +#endif + + return 0; +} + +static fd_set *ngx_select_get_fd_set(ngx_socket_t fd, int event, ngx_log_t *log) +{ + ngx_log_debug(log, "ngx_select_get_fd_set: %d %d" _ fd _ event); + +#if !(WIN32) + if (fd >= FD_SETSIZE) { + ngx_log_error(NGX_LOG_ERR, log, 0, + "ngx_select_get_event: maximum descriptor number" + "supported by select() is %d", + FD_SETSIZE - 1); + return NULL; + } +#endif + + switch (event) { + case NGX_READ_EVENT: +#if (WIN32) + if (max_read >= FD_SETSIZE) { + ngx_log_error(NGX_LOG_ERR, log, 0, + "ngx_select_get_event: maximum number of descriptors " + "supported by select() is %d", + FD_SETSIZE); + return NULL; + } +#endif + return &master_read_fds; + + case NGX_WRITE_EVENT: +#if (WIN32) + if (max_write >= FD_SETSIZE) { + ngx_log_error(NGX_LOG_ERR, log, 0, + "ngx_select_get_event: maximum number of descriptors " + "supported by select() is %d", + FD_SETSIZE); + return NULL; + } +#endif + return &master_write_fds; + + default: + ngx_assert(0, return NULL, log, + "ngx_select_get_fd_set: invalid event %d" _ event); + } + + return NULL; +} + +int ngx_select_process_events(ngx_log_t *log) +{ + int ready, found; + u_int timer, delta; + ngx_event_t *ev, *nx; + ngx_connection_t *cn; + struct timeval tv, *tp; + + work_read_fds = master_read_fds; + work_write_fds = master_write_fds; + + if (timer_queue.timer_next != &timer_queue) { + timer = timer_queue.timer_next->timer_delta; + tv.tv_sec = timer / 1000; + tv.tv_usec = (timer % 1000) * 1000; + tp = &tv; + + delta = ngx_msec(); + + } else { + timer = 0; + tp = NULL; + delta = 0; + } + +#if !(WIN32) + if (max_fd == -1) { + for (ev = event_queue.next; ev != &event_queue; ev = ev->next) { + cn = (ngx_connection_t *) ev->data; + if (max_fd < cn->fd) + max_fd = cn->fd; + } + + ngx_log_debug(log, "ngx_select_process_events: change max_fd: %d" _ + max_fd); + } +#endif + + ngx_log_debug(log, "ngx_select_process_events: timer: %d" _ timer); + +#if (WIN32) + if ((ready = select(0, &work_read_fds, &work_write_fds, NULL, tp)) +#else + if ((ready = select(max_fd + 1, &work_read_fds, &work_write_fds, NULL, tp)) +#endif + == -1) { + ngx_log_error(NGX_LOG_ALERT, log, ngx_socket_errno, + "ngx_select_process_events: select failed"); + return -1; + } + + ngx_log_debug(log, "ngx_select_process_events: ready %d" _ ready); + + if (timer) { + delta = ngx_msec() - delta; + + } else { + ngx_assert((ready != 0), return -1, log, + "ngx_select_process_events: " + "select returns no events without timeout"); + } + + ngx_log_debug(log, "ngx_select_process_events: " + "timer: %d, delta: %d" _ timer _ delta); + + if (timer) { + if (delta >= timer) { + for (ev = timer_queue.timer_next; + ev != &timer_queue && delta >= ev->timer_delta; + /* void */) + { + delta -= ev->timer_delta; + nx = ev->timer_next; + ngx_del_timer(ev); + if (ev->timer_handler(ev) == -1) + ev->close_handler(ev); + ev = nx; + } + + } else { + timer_queue.timer_next->timer_delta -= delta; + } + } + + for (ev = event_queue.next; ev != &event_queue; ev = ev->next) { + cn = (ngx_connection_t *) ev->data; + found = 0; + + if (ev->write) { + if (FD_ISSET(cn->fd, &work_write_fds)) { + ngx_log_debug(log, "ngx_select_process_events: write %d" _ + cn->fd); + found = 1; + } + + } else { + if (FD_ISSET(cn->fd, &work_read_fds)) { + ngx_log_debug(log, "ngx_select_process_events: read %d" _ + cn->fd); + found = 1; + } + } + + if (found) { + ev->ready = 1; + if (ev->event_handler(ev) == -1) + ev->close_handler(ev); + + ready--; + } + + } + + ngx_assert((ready == 0), return 0, log, + "ngx_select_process_events: ready != events"); + + return 0; +} + +static void ngx_add_timer(ngx_event_t *ev, u_int timer) +{ + ngx_event_t *e; + + for (e = timer_queue.timer_next; + e != &timer_queue && timer > e->timer_delta; + e = e->timer_next) + timer -= e->timer_delta; + + ev->timer_delta = timer; + + ev->timer_next = e; + ev->timer_prev = e->timer_prev; + + e->timer_prev->timer_next = ev; + e->timer_prev = ev; +} + +static void ngx_inline ngx_del_timer(ngx_event_t *ev) +{ + if (ev->timer_prev) + ev->timer_prev->timer_next = ev->timer_next; + + if (ev->timer_next) { + ev->timer_next->timer_prev = ev->timer_prev; + ev->timer_prev = NULL; + } + + if (ev->timer_prev) + ev->timer_next = NULL; +} diff --git a/src/event/modules/ngx_select_module.h b/src/event/modules/ngx_select_module.h new file mode 100644 index 000000000..6516981f4 --- /dev/null +++ b/src/event/modules/ngx_select_module.h @@ -0,0 +1,16 @@ +#ifndef _NGX_SELECT_MODULE_H_INCLUDED_ +#define _NGX_SELECT_MODULE_H_INCLUDED_ + + +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_event.h> + +void ngx_select_init(int max_connections, ngx_log_t *log); +int ngx_select_add_event(ngx_event_t *ev, int event, u_int flags); +int ngx_select_del_event(ngx_event_t *ev, int event); +int ngx_select_set_event(ngx_event_t *ev, int filter, u_int flags); +int ngx_select_process_events(ngx_log_t *log); + + +#endif /* _NGX_SELECT_MODULE_H_INCLUDED_ */ diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c new file mode 100644 index 000000000..14a36fbf5 --- /dev/null +++ b/src/event/ngx_event.c @@ -0,0 +1,87 @@ + +#include <ngx_config.h> +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_alloc.h> +#include <ngx_connection.h> +#include <ngx_event.h> +#include <ngx_event_accept.h> + +#include <ngx_select_module.h> +#if (HAVE_KQUEUE) +#include <ngx_kqueue_module.h> +#endif + + +ngx_connection_t *ngx_connections; +ngx_event_t *ngx_read_events, *ngx_write_events; + +#if !(USE_KQUEUE) + +#if 1 +ngx_event_type_e ngx_event_type = NGX_SELECT_EVENT; +#else +ngx_event_type_e ngx_event_type = NGX_KQUEUE_EVENT; +#endif + +ngx_event_actions_t ngx_event_actions; + +/* ngx_event_type_e order */ +static void (*ngx_event_init[]) (int max_connections, ngx_log_t *log) = { + ngx_select_init, +#if (HAVE_POLL) + ngx_poll_init, +#endif +#if (HAVE_KQUEUE) + ngx_kqueue_init +#endif +}; + +#endif /* USE_KQUEUE */ + + +void ngx_worker(ngx_listen_t *sock, int n, ngx_pool_t *pool, ngx_log_t *log) +{ + int i, fd; + + /* per group */ + int max_connections = 512; + + ngx_init_events(max_connections, log); + + ngx_read_events = ngx_alloc(sizeof(ngx_event_t) * max_connections, log); + ngx_write_events = ngx_alloc(sizeof(ngx_event_t) * max_connections, log); + ngx_connections = ngx_alloc(sizeof(ngx_connection_t) + * max_connections, log); + + /* for each listening socket */ + for (i = 0; i < n; i++) { + fd = sock[i].fd; + + ngx_memzero(&ngx_read_events[fd], sizeof(ngx_event_t)); + ngx_memzero(&ngx_write_events[fd], sizeof(ngx_event_t)); + ngx_memzero(&ngx_connections[fd], sizeof(ngx_connection_t)); + + ngx_connections[fd].fd = fd; + ngx_connections[fd].server = sock[i].server; + ngx_connections[fd].read = (void *) &ngx_read_events[fd].data; + ngx_read_events[fd].data = &ngx_connections[fd]; + ngx_read_events[fd].log = ngx_connections[fd].log = sock[i].log; + ngx_read_events[fd].data = &ngx_connections[fd]; + ngx_read_events[fd].event_handler = &ngx_event_accept; + ngx_read_events[fd].listening = 1; + + ngx_read_events[fd].available = 0; + +#if (HAVE_DEFERRED_ACCEPT) + ngx_read_events[fd].accept_filter = sock->accept_filter; +#endif + ngx_add_event(&ngx_read_events[fd], NGX_READ_EVENT, 0); + } + + while (1) { + ngx_log_debug(log, "ngx_worker cycle"); + + ngx_process_events(log); + } +} diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h new file mode 100644 index 000000000..7087e6950 --- /dev/null +++ b/src/event/ngx_event.h @@ -0,0 +1,127 @@ +#ifndef _NGX_EVENT_H_INCLUDED_ +#define _NGX_EVENT_H_INCLUDED_ + + +#include <ngx_config.h> +#include <ngx_types.h> +#include <ngx_log.h> + +typedef struct ngx_event_s ngx_event_t; + +struct ngx_event_s { + void *data; + + int (*event_handler)(ngx_event_t *ev); + int (*close_handler)(ngx_event_t *ev); + void *context; + char *action; + + ngx_event_t *prev; /* queue in select(), poll() */ + ngx_event_t *next; + + int (*timer_handler)(ngx_event_t *ev); + ngx_event_t *timer_prev; + ngx_event_t *timer_next; + + u_int timer_delta; + u_int timer; + + ngx_log_t *log; + + int available; /* kqueue only: */ + /* accept: number of sockets that wait */ + /* to be accepted */ + /* read: bytes to read */ + /* write: available space in buffer */ + /* otherwise: */ + /* accept: 1 if accept many, 0 otherwise */ + + /* flags - int are probably faster on write then bits ??? */ + unsigned listening:1; + unsigned write:1; + + unsigned ready:1; + unsigned timedout:1; + unsigned process:1; + unsigned read_discarded:1; + + unsigned unexpected_eof:1; + +#if (HAVE_DEFERRED_ACCEPT) + unsigned accept_filter:1; +#endif +#if (HAVE_KQUEUE) + unsigned eof:1; + int errno; +#endif +}; + +typedef enum { + NGX_SELECT_EVENT = 0, +#if (HAVE_POLL) + NGX_POLL_EVENT, +#endif +#if (HAVE_KQUEUE) + NGX_KQUEUE_EVENT, +#endif +} ngx_event_type_e ; + +typedef struct { + int (*add)(ngx_event_t *ev, int event, u_int flags); + int (*del)(ngx_event_t *ev, int event); + int (*process)(ngx_log_t *log); +/* + int (*read)(ngx_event_t *ev, char *buf, size_t size); + int (*write)(ngx_event_t *ev, char *buf, size_t size); +*/ +} ngx_event_actions_t; + + +#if (HAVE_KQUEUE) + +#define NGX_READ_EVENT EVFILT_READ +#define NGX_WRITE_EVENT EVFILT_WRITE +#define NGX_TIMER_EVENT (-EVFILT_SYSCOUNT - 1) + +#define NGX_ONESHOT_EVENT EV_ONESHOT +#define NGX_CLEAR_EVENT EV_CLEAR + +#else + +#define NGX_READ_EVENT 0 +#define NGX_WRITE_EVENT 1 +#define NGX_TIMER_EVENT 2 + +#define NGX_ONESHOT_EVENT 1 +#define NGX_CLEAR_EVENT 2 + +#endif + + +#if (USE_KQUEUE) +#define ngx_init_events ngx_kqueue_init +#define ngx_process_events ngx_kqueue_process_events +#define ngx_add_event ngx_kqueue_add_event +#define ngx_del_event ngx_kqueue_del_event +#else +#define ngx_init_events (ngx_event_init[ngx_event_type]) +#define ngx_process_events ngx_event_actions.process +#define ngx_add_event ngx_event_actions.add +#define ngx_del_event ngx_event_actions.del +#endif + + +extern ngx_event_t *ngx_read_events; +extern ngx_event_t *ngx_write_events; +extern ngx_connection_t *ngx_connections; + +#if !(USE_KQUEUE) +extern ngx_event_actions_t ngx_event_actions; +extern ngx_event_type_e ngx_event_type; +#endif + + +void ngx_worker(ngx_listen_t *sock, int n, ngx_pool_t *pool, ngx_log_t *log); + + +#endif /* _NGX_EVENT_H_INCLUDED_ */ diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c new file mode 100644 index 000000000..0cadf543f --- /dev/null +++ b/src/event/ngx_event_accept.c @@ -0,0 +1,83 @@ + +#include <ngx_config.h> +#include <ngx_types.h> +#include <ngx_log.h> +#include <ngx_connection.h> +#include <ngx_event.h> +#include <ngx_event_close.h> +#include <ngx_event_accept.h> + + +int ngx_event_accept(ngx_event_t *ev) +{ + ngx_err_t err; + ngx_socket_t s; + struct sockaddr_in addr; + int addrlen = sizeof(struct sockaddr_in); + ngx_connection_t *cn = (ngx_connection_t *) ev->data; + + ngx_log_debug(ev->log, "ngx_event_accept: accept ready: %d" _ + ev->available); + + ev->ready = 0; + + do { + if ((s = accept(cn->fd, (struct sockaddr *) &addr, &addrlen)) == -1) { + err = ngx_socket_errno; + if (err == NGX_EAGAIN) { + ngx_log_error(NGX_LOG_INFO, ev->log, err, + "ngx_event_accept: EAGAIN while accept"); + return 0; + } + + ngx_log_error(NGX_LOG_ERR, ev->log, err, + "ngx_event_accept: accept failed"); + /* if we return -1 listen socket would be closed */ + return 0; + } + + ngx_log_debug(ev->log, "ngx_event_accept: accepted socket: %d" _ s); + + ngx_memzero(&ngx_read_events[s], sizeof(ngx_event_t)); + ngx_memzero(&ngx_write_events[s], sizeof(ngx_event_t)); + ngx_memzero(&ngx_connections[s], sizeof(ngx_connection_t)); + + ngx_read_events[s].data = ngx_write_events[s].data + = &ngx_connections[s]; + ngx_connections[s].read = &ngx_read_events[s]; + ngx_connections[s].write = &ngx_write_events[s]; + + ngx_connections[s].fd = s; + ngx_read_events[s].unexpected_eof = 1; + ngx_write_events[s].ready = 1; + + ngx_write_events[s].timer = ngx_read_events[s].timer = 10000; + + ngx_write_events[s].timer_handler = + ngx_read_events[s].timer_handler = ngx_event_close; + + ngx_write_events[s].close_handler = + ngx_read_events[s].close_handler = ngx_event_close; + + ngx_connections[s].server = cn->server; + ngx_connections[s].servers = cn->servers; + ngx_connections[s].log = + ngx_read_events[s].log = ngx_write_events[s].log = ev->log; + +#if (HAVE_DEFERRED_ACCEPT) + if (ev->accept_filter) + ngx_read_events[s].ready = 1; +#endif + + cn->server->handler(&ngx_connections[s]); + +#if (HAVE_KQUEUE) +#if !(USE_KQUEUE) + if (ngx_event_type == NGX_KQUEUE_EVENT) +#endif + ev->available--; +#endif + } while (ev->available); + + return 0; +} diff --git a/src/event/ngx_event_accept.h b/src/event/ngx_event_accept.h new file mode 100644 index 000000000..7596c6e7f --- /dev/null +++ b/src/event/ngx_event_accept.h @@ -0,0 +1,10 @@ +#ifndef _NGX_EVENT_ACCEPT_H_INCLUDED_ +#define _NGX_EVENT_ACCEPT_H_INCLUDED_ + + +#include <ngx_event.h> + +int ngx_event_accept(ngx_event_t *ev); + + +#endif /* _NGX_EVENT_ACCEPT_H_INCLUDED_ */ diff --git a/src/event/ngx_event_close.c b/src/event/ngx_event_close.c new file mode 100644 index 000000000..270855a5d --- /dev/null +++ b/src/event/ngx_event_close.c @@ -0,0 +1,29 @@ + +#include <ngx_config.h> +#include <ngx_types.h> +#include <ngx_connection.h> +#include <ngx_event_close.h> + + +int ngx_event_close(ngx_event_t *ev) +{ + int rc; + ngx_connection_t *cn = (ngx_connection_t *) ev->data; + + ngx_assert((cn->fd != -1), return -1, ev->log, + "ngx_event_close: already closed"); + + if ((rc = ngx_close_socket(cn->fd)) == -1) + ngx_log_error(NGX_LOG_ERR, ev->log, ngx_socket_errno, + "ngx_event_close: close failed"); + + if (cn->read->next) + ngx_del_event(cn->read, NGX_READ_EVENT); + + if (cn->write->next) + ngx_del_event(cn->write, NGX_WRITE_EVENT); + + cn->fd = -1; + + return rc; +} diff --git a/src/event/ngx_event_close.h b/src/event/ngx_event_close.h new file mode 100644 index 000000000..90c6512d3 --- /dev/null +++ b/src/event/ngx_event_close.h @@ -0,0 +1,10 @@ +#ifndef _NGX_EVENT_CLOSE_H_INCLUDED_ +#define _NGX_EVENT_CLOSE_H_INCLUDED_ + + +#include <ngx_event.h> + +int ngx_event_close(ngx_event_t *ev); + + +#endif /* _NGX_EVENT_CLOSE_H_INCLUDED_ */ diff --git a/src/event/ngx_event_read.c b/src/event/ngx_event_read.c new file mode 100644 index 000000000..b08e5107e --- /dev/null +++ b/src/event/ngx_event_read.c @@ -0,0 +1,55 @@ + +#include <ngx_config.h> +#include <ngx_errno.h> +#include <ngx_log.h> +#include <ngx_connection.h> + +int ngx_event_recv(ngx_connection_t *c, char *buf, size_t size) +{ + int n; + ngx_err_t err; + ngx_event_t *ev = c->read; + +#if (HAVE_KQUEUE) +#if !(USE_KQUEUE) + if (ngx_event_type == NGX_KQUEUE_EVENT) +#endif + if (ev->eof && ev->available == 0) { + if (ev->error) { + ngx_log_error(NGX_LOG_ERR, ev->log, ev->error, + "ngx_event_recv: recv failed while %s", + ev->log->action); + + return -1; + } + + return 0; + } +#endif + + n = recv(c->fd, buf, size, 0); + + if (n == -1) { + err = ngx_socket_errno; + + if (err == NGX_EAGAIN) { + ngx_log_error(NGX_LOG_INFO, ev->log, err, + "ngx_event_recv: EAGAIN while %s", ev->log->action); + return -2; + } + + ngx_log_error(NGX_LOG_INFO, ev->log, err, + "ngx_event_recv: recv failed while %s", ev->log->action); + + return -1; + } + +#if (HAVE_KQUEUE) +#if !(USE_KQUEUE) + if (ngx_event_type == NGX_KQUEUE_EVENT) +#endif + ev->available -= n; +#endif + + return n; +} diff --git a/src/event/ngx_event_write.c b/src/event/ngx_event_write.c new file mode 100644 index 000000000..0a9a482d9 --- /dev/null +++ b/src/event/ngx_event_write.c @@ -0,0 +1,136 @@ + +#include <ngx_config.h> +#include <ngx_types.h> +#include <ngx_alloc.h> +#include <ngx_array.h> +#include <ngx_hunk.h> +#include <ngx_connection.h> +#include <ngx_sendv.h> +#include <ngx_sendfile.h> +#include <ngx_event_write.h> + + +ngx_chain_t *ngx_event_writer(ngx_connection_t *cn, ngx_chain_t *in, + off_t flush) +{ + int rc; + char *last; + off_t sent; + ngx_iovec_t *iov; + ngx_array_t *header, *trailer; + ngx_hunk_t *file; + ngx_chain_t *ch; + + ch = in; + file = NULL; + + ngx_test_null(header, ngx_create_array(cn->pool, 10, sizeof(ngx_iovec_t)), + (ngx_chain_t *) -1); + + ngx_test_null(trailer, ngx_create_array(cn->pool, 10, sizeof(ngx_iovec_t)), + (ngx_chain_t *) -1); + + do { + header->nelts = 0; + trailer->nelts = 0; + + if (ch->hunk->type & (NGX_HUNK_IN_MEMORY | NGX_HUNK_FLUSH)) { + last = NULL; + iov = NULL; + + while (ch + && (ch->hunk->type & (NGX_HUNK_IN_MEMORY | NGX_HUNK_FLUSH))) + { + if (ch->hunk->type & NGX_HUNK_FLUSH) + continue; + + if (last == ch->hunk->pos.p) { + iov->ngx_iov_len += ch->hunk->last.p - ch->hunk->pos.p; + + } else { + ngx_test_null(iov, ngx_push_array(header), + (ngx_chain_t *) -1); + iov->ngx_iov_base = ch->hunk->pos.p; + iov->ngx_iov_len = ch->hunk->last.p - ch->hunk->pos.p; + last = ch->hunk->last.p; + } + + ch = ch->next; + } + } + + if (ch && (ch->hunk->type & NGX_HUNK_FILE)) { + file = ch->hunk; + ch = ch->next; + } + +#if (HAVE_MAX_SENDFILE_IOVEC) + if (file && header->nelts > HAVE_MAX_SENDFILE_IOVEC) { + rc = ngx_sendv(cn->fd, (ngx_iovec_t *) header->elts, header->nelts, + &sent); + } else { +#endif + if (ch && ch->hunk->type & (NGX_HUNK_IN_MEMORY | NGX_HUNK_FLUSH)) { + last = NULL; + iov = NULL; + + while (ch + && (ch->hunk->type & (NGX_HUNK_IN_MEMORY | NGX_HUNK_FLUSH))) + { + if (ch->hunk->type & NGX_HUNK_FLUSH) + continue; + + if (last == ch->hunk->pos.p) { + iov->ngx_iov_len += ch->hunk->last.p - ch->hunk->pos.p; + + } else { + ngx_test_null(iov, ngx_push_array(trailer), + (ngx_chain_t *) -1); + iov->ngx_iov_base = ch->hunk->pos.p; + iov->ngx_iov_len = ch->hunk->last.p - ch->hunk->pos.p; + last = ch->hunk->last.p; + } + + ch = ch->next; + } + } + + if (file) { + rc = ngx_sendfile(cn->fd, + (ngx_iovec_t *) header->elts, header->nelts, + file->fd, file->pos.f, + (size_t) (file->last.f - file->pos.f), + (ngx_iovec_t *) trailer->elts, trailer->nelts, + &sent, cn->log); + } else { + rc = ngx_sendv(cn->fd, (ngx_iovec_t *) header->elts, + header->nelts, (size_t *) &sent); + } +#if (HAVE_MAX_SENDFILE_IOVEC) + } +#endif + /* save sent for logging */ + + if (rc == -1) + return (ngx_chain_t *) -1; + + flush -= sent; + + for (ch = in; ch && !(ch->hunk->type & NGX_HUNK_LAST); ch = ch->next) { + if (sent >= ch->hunk->last.f - ch->hunk->pos.f) { + sent -= ch->hunk->last.f - ch->hunk->pos.f; + ch->hunk->last.f = ch->hunk->pos.f; + continue; + } + + ch->hunk->pos.f += sent; + break; + } + + } while (flush > 0); + + ngx_destroy_array(trailer); + ngx_destroy_array(header); + + return ch; +} diff --git a/src/event/ngx_event_write.h b/src/event/ngx_event_write.h new file mode 100644 index 000000000..a355866c8 --- /dev/null +++ b/src/event/ngx_event_write.h @@ -0,0 +1,13 @@ +#ifndef _NGX_EVENT_WRITE_H_INCLUDED_ +#define _NGX_EVENT_WRITE_H_INCLUDED_ + + +#include <ngx_types.h> +#include <ngx_hunk.h> +#include <ngx_connection.h> + +ngx_chain_t *ngx_event_write(ngx_connection_t *cn, ngx_chain_t *in, + off_t flush); + + +#endif /* _NGX_EVENT_WRITE_H_INCLUDED_ */ |
