summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_controller.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_controller.c')
-rw-r--r--src/nxt_controller.c222
1 files changed, 183 insertions, 39 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index f9b2cf26..a61c127d 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -39,14 +39,21 @@ typedef struct {
} nxt_controller_response_t;
+static nxt_int_t nxt_controller_prefork(nxt_task_t *task,
+ nxt_process_t *process, nxt_mp_t *mp);
+static nxt_int_t nxt_controller_start(nxt_task_t *task,
+ nxt_process_data_t *data);
static void nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static void nxt_controller_send_current_conf(nxt_task_t *task);
static void nxt_controller_router_ready_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
+static void nxt_controller_remove_pid_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_controller_conf_default(void);
static void nxt_controller_conf_init_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+static void nxt_controller_flush_requests(nxt_task_t *task);
static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
@@ -75,12 +82,15 @@ static void nxt_controller_process_request(nxt_task_t *task,
nxt_controller_request_t *req);
static void nxt_controller_process_config(nxt_task_t *task,
nxt_controller_request_t *req, nxt_str_t *path);
+static nxt_bool_t nxt_controller_check_postpone_request(nxt_task_t *task);
#if (NXT_TLS)
static void nxt_controller_process_cert(nxt_task_t *task,
nxt_controller_request_t *req, nxt_str_t *path);
static void nxt_controller_process_cert_save(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name);
+static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj,
+ void *data);
#endif
static void nxt_controller_conf_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
@@ -103,6 +113,7 @@ static nxt_uint_t nxt_controller_listening;
static nxt_uint_t nxt_controller_router_ready;
static nxt_controller_conf_t nxt_controller_conf;
static nxt_queue_t nxt_controller_waiting_requests;
+static nxt_bool_t nxt_controller_waiting_init_conf;
static const nxt_event_conn_state_t nxt_controller_conn_read_state;
@@ -111,21 +122,117 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state;
static const nxt_event_conn_state_t nxt_controller_conn_close_state;
-nxt_port_handlers_t nxt_controller_process_port_handlers = {
- .quit = nxt_worker_process_quit_handler,
+static const nxt_port_handlers_t nxt_controller_process_port_handlers = {
+ .quit = nxt_signal_quit_handler,
.new_port = nxt_controller_process_new_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.process_ready = nxt_controller_router_ready_handler,
.data = nxt_port_data_handler,
- .remove_pid = nxt_port_remove_pid_handler,
+ .remove_pid = nxt_controller_remove_pid_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
};
-nxt_int_t
-nxt_controller_start(nxt_task_t *task, void *data)
+const nxt_process_init_t nxt_controller_process = {
+ .name = "controller",
+ .type = NXT_PROCESS_CONTROLLER,
+ .prefork = nxt_controller_prefork,
+ .restart = 1,
+ .setup = nxt_process_core_setup,
+ .start = nxt_controller_start,
+ .port_handlers = &nxt_controller_process_port_handlers,
+ .signals = nxt_process_signals,
+};
+
+
+static nxt_int_t
+nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
+{
+ ssize_t n;
+ nxt_int_t ret;
+ nxt_str_t *conf;
+ nxt_file_t file;
+ nxt_runtime_t *rt;
+ nxt_file_info_t fi;
+ nxt_controller_init_t ctrl_init;
+
+ nxt_log(task, NXT_LOG_INFO, "controller started");
+
+ rt = task->thread->runtime;
+
+ nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t));
+
+ conf = &ctrl_init.conf;
+
+ nxt_memzero(&file, sizeof(nxt_file_t));
+
+ file.name = (nxt_file_name_t *) rt->conf;
+
+ ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0);
+
+ if (ret == NXT_OK) {
+ ret = nxt_file_info(&file, &fi);
+
+ if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) {
+ conf->length = nxt_file_size(&fi);
+ conf->start = nxt_mp_alloc(mp, conf->length);
+ if (nxt_slow_path(conf->start == NULL)) {
+ nxt_file_close(task, &file);
+ return NXT_ERROR;
+ }
+
+ n = nxt_file_read(&file, conf->start, conf->length, 0);
+
+ if (nxt_slow_path(n != (ssize_t) conf->length)) {
+ conf->start = NULL;
+ conf->length = 0;
+
+ nxt_alert(task, "failed to restore previous configuration: "
+ "cannot read the file");
+ }
+ }
+
+ nxt_file_close(task, &file);
+ }
+
+#if (NXT_TLS)
+ ctrl_init.certs = nxt_cert_store_load(task, mp);
+
+ nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt);
+#endif
+
+ process->data.controller = ctrl_init;
+
+ return NXT_OK;
+}
+
+
+#if (NXT_TLS)
+
+static void
+nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data)
+{
+ pid_t main_pid;
+ nxt_array_t *certs;
+ nxt_runtime_t *rt;
+
+ certs = obj;
+ rt = data;
+
+ main_pid = rt->port_by_type[NXT_PROCESS_MAIN]->pid;
+
+ if (nxt_pid == main_pid && certs != NULL) {
+ nxt_cert_store_release(certs);
+ }
+}
+
+#endif
+
+
+static nxt_int_t
+nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data)
{
nxt_mp_t *mp;
nxt_int_t ret;
@@ -144,15 +251,13 @@ nxt_controller_start(nxt_task_t *task, void *data)
nxt_queue_init(&nxt_controller_waiting_requests);
- init = data;
+ init = &data->controller;
#if (NXT_TLS)
-
if (init->certs != NULL) {
nxt_cert_info_init(task, init->certs);
nxt_cert_store_release(init->certs);
}
-
#endif
json = &init->conf;
@@ -167,8 +272,6 @@ nxt_controller_start(nxt_task_t *task, void *data)
}
conf = nxt_conf_json_parse_str(mp, json);
- nxt_free(json->start);
-
if (nxt_slow_path(conf == NULL)) {
nxt_alert(task, "failed to restore previous configuration: "
"file is corrupted or not enough memory");
@@ -245,6 +348,8 @@ nxt_controller_send_current_conf(nxt_task_t *task)
nxt_controller_conf_init_handler, NULL);
if (nxt_fast_path(rc == NXT_OK)) {
+ nxt_controller_waiting_init_conf = 1;
+
return;
}
@@ -266,6 +371,8 @@ nxt_controller_send_current_conf(nxt_task_t *task)
}
nxt_controller_listening = 1;
+
+ nxt_controller_flush_requests(task);
}
@@ -288,6 +395,28 @@ nxt_controller_router_ready_handler(nxt_task_t *task,
}
+static void
+nxt_controller_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_pid_t pid;
+ nxt_process_t *process;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ nxt_assert(nxt_buf_used_size(msg->buf) == sizeof(pid));
+
+ nxt_memcpy(&pid, msg->buf->mem.pos, sizeof(pid));
+
+ process = nxt_runtime_process_find(rt, pid);
+ if (process != NULL && nxt_process_type(process) == NXT_PROCESS_ROUTER) {
+ nxt_controller_router_ready = 0;
+ }
+
+ nxt_port_remove_pid_handler(task, msg);
+}
+
+
static nxt_int_t
nxt_controller_conf_default(void)
{
@@ -322,6 +451,8 @@ nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
{
nxt_runtime_t *rt;
+ nxt_controller_waiting_init_conf = 0;
+
if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
nxt_alert(task, "failed to apply previous configuration");
@@ -343,6 +474,25 @@ nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_controller_listening = 1;
}
+
+ nxt_controller_flush_requests(task);
+}
+
+
+static void
+nxt_controller_flush_requests(nxt_task_t *task)
+{
+ nxt_queue_t queue;
+ nxt_controller_request_t *req;
+
+ nxt_queue_init(&queue);
+ nxt_queue_add(&queue, &nxt_controller_waiting_requests);
+
+ nxt_queue_init(&nxt_controller_waiting_requests);
+
+ nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
+ nxt_controller_process_request(task, req);
+ } nxt_queue_loop;
}
@@ -361,9 +511,8 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
- if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) {
- return NXT_DECLINED;
- }
+ nxt_assert(router_port != NULL);
+ nxt_assert(nxt_controller_router_ready);
controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
@@ -961,7 +1110,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
if (post || nxt_str_eq(&req->parser.method, "PUT", 3)) {
- if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
+ if (nxt_controller_check_postpone_request(task)) {
nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
return;
}
@@ -1058,10 +1207,6 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
if (nxt_slow_path(rc != NXT_OK)) {
nxt_mp_destroy(mp);
- if (rc == NXT_DECLINED) {
- goto no_router;
- }
-
/* rc == NXT_ERROR */
goto alloc_fail;
}
@@ -1076,7 +1221,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
- if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
+ if (nxt_controller_check_postpone_request(task)) {
nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
return;
}
@@ -1143,10 +1288,6 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
if (nxt_slow_path(rc != NXT_OK)) {
nxt_mp_destroy(mp);
- if (rc == NXT_DECLINED) {
- goto no_router;
- }
-
/* rc == NXT_ERROR */
goto alloc_fail;
}
@@ -1193,16 +1334,27 @@ alloc_fail:
resp.offset = -1;
nxt_controller_response(task, req, &resp);
- return;
+}
-no_router:
- resp.status = 500;
- resp.title = (u_char *) "Router process isn't available.";
- resp.offset = -1;
+static nxt_bool_t
+nxt_controller_check_postpone_request(nxt_task_t *task)
+{
+ nxt_port_t *router_port;
+ nxt_runtime_t *rt;
- nxt_controller_response(task, req, &resp);
- return;
+ if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)
+ || nxt_controller_waiting_init_conf
+ || !nxt_controller_router_ready)
+ {
+ return 1;
+ }
+
+ rt = task->thread->runtime;
+
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ return (router_port == NULL);
}
@@ -1469,7 +1621,6 @@ static void
nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_queue_t queue;
nxt_controller_request_t *req;
nxt_controller_response_t resp;
@@ -1502,14 +1653,7 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_controller_response(task, req, &resp);
- nxt_queue_init(&queue);
- nxt_queue_add(&queue, &nxt_controller_waiting_requests);
-
- nxt_queue_init(&nxt_controller_waiting_requests);
-
- nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
- nxt_controller_process_request(task, req);
- } nxt_queue_loop;
+ nxt_controller_flush_requests(task);
}