diff options
Diffstat (limited to '')
| -rw-r--r-- | src/stream/ngx_stream.c | 9 | ||||
| -rw-r--r-- | src/stream/ngx_stream.h | 2 | ||||
| -rw-r--r-- | src/stream/ngx_stream_core_module.c | 30 | ||||
| -rw-r--r-- | src/stream/ngx_stream_handler.c | 7 | ||||
| -rw-r--r-- | src/stream/ngx_stream_proxy_module.c | 140 | ||||
| -rw-r--r-- | src/stream/ngx_stream_upstream.h | 1 |
6 files changed, 159 insertions, 30 deletions
diff --git a/src/stream/ngx_stream.c b/src/stream/ngx_stream.c index caaf38a68..3bd8f6dce 100644 --- a/src/stream/ngx_stream.c +++ b/src/stream/ngx_stream.c @@ -275,8 +275,11 @@ ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, port = ports->elts; for (i = 0; i < ports->nelts; i++) { - if (p == port[i].port && sa->sa_family == port[i].family) { + if (p == port[i].port + && listen->type == port[i].type + && sa->sa_family == port[i].family) + { /* a port is already in the port list */ port = &port[i]; @@ -292,6 +295,7 @@ ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports, } port->family = sa->sa_family; + port->type = listen->type; port->port = p; if (ngx_array_init(&port->addrs, cf->temp_pool, 2, @@ -364,6 +368,7 @@ ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports) ls->addr_ntop = 1; ls->handler = ngx_stream_init_connection; ls->pool_size = 256; + ls->type = addr[i].opt.type; cscf = addr->opt.ctx->srv_conf[ngx_stream_core_module.ctx_index]; @@ -373,6 +378,8 @@ ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports) ls->backlog = addr[i].opt.backlog; + ls->wildcard = addr[i].opt.wildcard; + ls->keepalive = addr[i].opt.so_keepalive; #if (NGX_HAVE_KEEPALIVE_TUNABLE) ls->keepidle = addr[i].opt.tcp_keepidle; diff --git a/src/stream/ngx_stream.h b/src/stream/ngx_stream.h index 21953e940..49efa4517 100644 --- a/src/stream/ngx_stream.h +++ b/src/stream/ngx_stream.h @@ -66,6 +66,7 @@ typedef struct { int tcp_keepcnt; #endif int backlog; + int type; } ngx_stream_listen_t; @@ -102,6 +103,7 @@ typedef struct { typedef struct { int family; + int type; in_port_t port; ngx_array_t addrs; /* array of ngx_stream_conf_addr_t */ } ngx_stream_conf_port_t; diff --git a/src/stream/ngx_stream_core_module.c b/src/stream/ngx_stream_core_module.c index 0ecc448a4..ebc2b1c07 100644 --- a/src/stream/ngx_stream_core_module.c +++ b/src/stream/ngx_stream_core_module.c @@ -252,7 +252,7 @@ ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) in_port_t port; ngx_str_t *value; ngx_url_t u; - ngx_uint_t i; + ngx_uint_t i, backlog; struct sockaddr *sa; struct sockaddr_in *sin; ngx_stream_listen_t *ls; @@ -343,6 +343,7 @@ ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ls->socklen = u.socklen; ls->backlog = NGX_LISTEN_BACKLOG; + ls->type = SOCK_STREAM; ls->wildcard = u.wildcard; ls->ctx = cf->ctx; @@ -350,8 +351,17 @@ ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ls->ipv6only = 1; #endif + backlog = 0; + for (i = 2; i < cf->args->nelts; i++) { +#if !(NGX_WIN32) + if (ngx_strcmp(value[i].data, "udp") == 0) { + ls->type = SOCK_DGRAM; + continue; + } +#endif + if (ngx_strcmp(value[i].data, "bind") == 0) { ls->bind = 1; continue; @@ -367,6 +377,8 @@ ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) return NGX_CONF_ERROR; } + backlog = 1; + continue; } @@ -530,5 +542,21 @@ ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) return NGX_CONF_ERROR; } + if (ls->type == SOCK_DGRAM) { + if (backlog) { + return "\"backlog\" parameter is incompatible with \"udp\""; + } + +#if (NGX_STREAM_SSL) + if (ls->ssl) { + return "\"ssl\" parameter is incompatible with \"udp\""; + } +#endif + + if (ls->so_keepalive) { + return "\"so_keepalive\" parameter is incompatible with \"udp\""; + } + } + return NGX_CONF_OK; } diff --git a/src/stream/ngx_stream_handler.c b/src/stream/ngx_stream_handler.c index b3edb684f..d59d0b0cc 100644 --- a/src/stream/ngx_stream_handler.c +++ b/src/stream/ngx_stream_handler.c @@ -52,7 +52,7 @@ ngx_stream_init_connection(ngx_connection_t *c) * is the "*:port" wildcard so getsockname() is needed to determine * the server address. * - * AcceptEx() already gave this address. + * AcceptEx() and recvmsg() already gave this address. */ if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { @@ -166,7 +166,10 @@ ngx_stream_init_connection(ngx_connection_t *c) } } - if (cscf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + if (c->type == SOCK_STREAM + && cscf->tcp_nodelay + && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) + { ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, "tcp_nodelay"); tcp_nodelay = 1; diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c index a83d627d7..ad3acbaf1 100644 --- a/src/stream/ngx_stream_proxy_module.c +++ b/src/stream/ngx_stream_proxy_module.c @@ -17,6 +17,7 @@ typedef struct { size_t buffer_size; size_t upload_rate; size_t download_rate; + ngx_uint_t responses; ngx_uint_t next_upstream_tries; ngx_flag_t next_upstream; ngx_flag_t proxy_protocol; @@ -167,6 +168,13 @@ static ngx_command_t ngx_stream_proxy_commands[] = { offsetof(ngx_stream_proxy_srv_conf_t, download_rate), NULL }, + { ngx_string("proxy_responses"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_num_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, responses), + NULL }, + { ngx_string("proxy_next_upstream"), NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, @@ -351,6 +359,7 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s) u->peer.log_error = NGX_ERROR_ERR; u->peer.local = pscf->local; + u->peer.type = c->type; uscf = pscf->upstream; @@ -370,6 +379,14 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s) u->proxy_protocol = pscf->proxy_protocol; u->start_sec = ngx_time(); + c->write->handler = ngx_stream_proxy_downstream_handler; + c->read->handler = ngx_stream_proxy_downstream_handler; + + if (c->type == SOCK_DGRAM) { + ngx_stream_proxy_connect(s); + return; + } + p = ngx_pnalloc(c->pool, pscf->buffer_size); if (p == NULL) { ngx_stream_proxy_finalize(s, NGX_ERROR); @@ -381,9 +398,6 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s) u->downstream_buf.pos = p; u->downstream_buf.last = p; - c->write->handler = ngx_stream_proxy_downstream_handler; - c->read->handler = ngx_stream_proxy_downstream_handler; - if (u->proxy_protocol #if (NGX_STREAM_SSL) && pscf->ssl == NULL @@ -488,7 +502,10 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s) cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module); - if (cscf->tcp_nodelay && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { + if (pc->type == SOCK_STREAM + && cscf->tcp_nodelay + && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) + { ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay"); tcp_nodelay = 1; @@ -516,7 +533,7 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s) pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); #if (NGX_STREAM_SSL) - if (pscf->ssl && pc->ssl == NULL) { + if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) { ngx_stream_proxy_ssl_init_connection(s); return; } @@ -544,23 +561,35 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s) c->log->action = "proxying connection"; - p = ngx_pnalloc(c->pool, pscf->buffer_size); - if (p == NULL) { - ngx_stream_proxy_finalize(s, NGX_ERROR); - return; + if (u->upstream_buf.start == NULL) { + p = ngx_pnalloc(c->pool, pscf->buffer_size); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + u->upstream_buf.start = p; + u->upstream_buf.end = p + pscf->buffer_size; + u->upstream_buf.pos = p; + u->upstream_buf.last = p; } - u->upstream_buf.start = p; - u->upstream_buf.end = p + pscf->buffer_size; - u->upstream_buf.pos = p; - u->upstream_buf.last = p; + if (c->type == SOCK_DGRAM) { + s->received = c->buffer->last - c->buffer->pos; + u->downstream_buf = *c->buffer; + + if (pscf->responses == 0) { + pc->read->ready = 0; + pc->read->eof = 1; + } + } u->connected = 1; pc->read->handler = ngx_stream_proxy_upstream_handler; pc->write->handler = ngx_stream_proxy_upstream_handler; - if (pc->read->ready) { + if (pc->read->ready || pc->read->eof) { ngx_post_event(pc->read, &ngx_posted_events); } @@ -894,11 +923,15 @@ ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) s = c->data; u = s->upstream; + c = s->connection; + pc = u->peer.connection; + + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + if (ev->timedout) { + ev->timedout = 0; if (ev->delayed) { - - ev->timedout = 0; ev->delayed = 0; if (!ev->ready) { @@ -907,20 +940,35 @@ ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) return; } - if (u->connected) { - pc = u->peer.connection; - - if (!c->read->delayed && !pc->read->delayed) { - pscf = ngx_stream_get_module_srv_conf(s, - ngx_stream_proxy_module); - ngx_add_timer(c->write, pscf->timeout); - } + if (u->connected && !c->read->delayed && !pc->read->delayed) { + ngx_add_timer(c->write, pscf->timeout); } return; } } else { + if (s->connection->type == SOCK_DGRAM) { + if (pscf->responses == NGX_MAX_INT32_VALUE) { + + /* + * successfully terminate timed out UDP session + * with unspecified number of responses + */ + + pc->read->ready = 0; + pc->read->eof = 1; + + ngx_stream_proxy_process(s, 1, 0); + return; + } + + if (u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + } + ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); ngx_stream_proxy_finalize(s, NGX_DECLINED); return; @@ -1039,6 +1087,21 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, c = s->connection; pc = u->connected ? u->peer.connection : NULL; + if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) { + + /* socket is already closed on worker shutdown */ + + handler = c->log->handler; + c->log->handler = NULL; + + ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown"); + + c->log->handler = handler; + + ngx_stream_proxy_finalize(s, NGX_OK); + return; + } + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); if (from_upstream) { @@ -1066,7 +1129,17 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, n = dst->send(dst, b->pos, size); + if (n == NGX_AGAIN && dst->shared) { + /* cannot wait on a shared socket */ + n = NGX_ERROR; + } + if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && !from_upstream) { + ngx_stream_proxy_next_upstream(s); + return; + } + ngx_stream_proxy_finalize(s, NGX_DECLINED); return; } @@ -1118,6 +1191,12 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, } } + if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses) + { + src->read->ready = 0; + src->read->eof = 1; + } + *received += n; b->last += n; do_write = 1; @@ -1126,6 +1205,11 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, } if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + src->read->eof = 1; } } @@ -1152,13 +1236,13 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, flags = src->read->eof ? NGX_CLOSE_EVENT : 0; - if (ngx_handle_read_event(src->read, flags) != NGX_OK) { + if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) { ngx_stream_proxy_finalize(s, NGX_ERROR); return; } if (dst) { - if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { + if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) { ngx_stream_proxy_finalize(s, NGX_ERROR); return; } @@ -1331,6 +1415,7 @@ ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf) conf->buffer_size = NGX_CONF_UNSET_SIZE; conf->upload_rate = NGX_CONF_UNSET_SIZE; conf->download_rate = NGX_CONF_UNSET_SIZE; + conf->responses = NGX_CONF_UNSET_UINT; conf->next_upstream_tries = NGX_CONF_UNSET_UINT; conf->next_upstream = NGX_CONF_UNSET; conf->proxy_protocol = NGX_CONF_UNSET; @@ -1373,6 +1458,9 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_size_value(conf->download_rate, prev->download_rate, 0); + ngx_conf_merge_uint_value(conf->responses, + prev->responses, NGX_MAX_INT32_VALUE); + ngx_conf_merge_uint_value(conf->next_upstream_tries, prev->next_upstream_tries, 0); diff --git a/src/stream/ngx_stream_upstream.h b/src/stream/ngx_stream_upstream.h index 80520c2b3..1f4810cdf 100644 --- a/src/stream/ngx_stream_upstream.h +++ b/src/stream/ngx_stream_upstream.h @@ -84,6 +84,7 @@ typedef struct { ngx_buf_t upstream_buf; off_t received; time_t start_sec; + ngx_uint_t responses; #if (NGX_STREAM_SSL) ngx_str_t ssl_name; #endif |
