# Nginx 整体启动流程二
ngx_master_process_cycle 函数
该函数为 Nginx 工作在 Master - Slave 模式下创建多个事件循环并多进程并行执行IO事件,同样做法和 Netty 一样一样的:每个事件循环自己一个多路复用器。该函数的核心原理便是在 for 循环中启动 worker 进程,同时接收 nginx 命令发送过来的信号,通过信号机制 通知子进程完成不同的事件响应,核心机制便是:Linux 信号处理机制,将会在混沌学堂中描述。
void ngx_master_process_cycle(ngx_cycle_t *cycle)
{
...
// 设置响应信号集
sigemptyset(&set);
sigaddset(&set, SIGCHLD);
sigaddset(&set, SIGALRM);
sigaddset(&set, SIGIO);
sigaddset(&set, SIGINT);
sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));
// 屏蔽上述信号集的信号,避免在执行过程中被上述信号打断执行
if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}
sigemptyset(&set); // 清空信号集
size = sizeof(master_process);
// 设置 master 进程名
for (i = 0; i < ngx_argc; i++) {
size += ngx_strlen(ngx_argv[i]) + 1;
}
title = ngx_pnalloc(cycle->pool, size);
if (title == NULL) {
exit(2);
}
p = ngx_cpymem(title, master_process, sizeof(master_process) - 1);
for (i = 0; i < ngx_argc; i++) {
*p++ = ' ';
p = ngx_cpystrn(p, (u_char *) ngx_argv[i], size);
}
ngx_setproctitle(title);
// 获取核心模块的配置信息
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
// 启动 worker 进程
ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN);
// 启动缓存管理进程(该进程后文分析,将会用于清理无用的缓存空间)
ngx_start_cache_manager_processes(cycle, 0);
...
delay = 0;
sigio = 0;
for ( ;; ) { // 循环处理,直到检测退出
if (delay) { // 指定延迟执行,此时将通过 setitimer 设置一个定时器,定时器到后将会给当前进程发送 SIGALRM 信号
if (ngx_sigalrm) {
sigio = 0;
delay *= 2;
ngx_sigalrm = 0;
}
itv.it_interval.tv_sec = 0;
itv.it_interval.tv_usec = 0;
itv.it_value.tv_sec = delay / 1000;
itv.it_value.tv_usec = (delay % 1000 ) * 1000;
if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"setitimer() failed");
}
}
sigsuspend(&set); // 阻塞当前进程执行,直到接收到信号(也即等待上述的定时器到期,当然,有其他可能被其他信号唤醒,但若是正常执行,那么应该由上述的定时器的 SIGARLM 信号唤醒)
ngx_time_update(); // 更新时间
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"wake up, sigio %i", sigio); // 通过该debug日志也可以看到当前被唤醒的信号
if (ngx_reap) { // 处理子进程退出事件
ngx_reap = 0;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children");
live = ngx_reap_children(cycle);
}
if (!live && (ngx_terminate || ngx_quit)) { // 主进程退出
ngx_master_process_exit(cycle);
}
if (ngx_terminate) { // nginx 终止,那么通知子进程退出,通过发送 SIGKILL 信号
if (delay == 0) {
delay = 50;
}
if (sigio) {
sigio--;
continue;
}
sigio = ccf->worker_processes + 2 // 加上两个缓存管理进程;
if (delay > 1000) { // 延迟时间大于1秒,那么发送 SIGKILL 信号
ngx_signal_worker_processes(cycle, SIGKILL);
} else { // 否则发送 SIGTERM 信号
ngx_signal_worker_processes(cycle,
ngx_signal_value(NGX_TERMINATE_SIGNAL));
}
continue;
}
if (ngx_quit) { // 通知子进程 nginx 退出
ngx_signal_worker_processes(cycle,ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
ngx_close_listening_sockets(cycle);
continue;
}
if (ngx_reconfigure) { // 通知子进程重配置
...
}
if (ngx_restart) { // 通知子进程重新执行
...
}
if (ngx_reopen) { // 通知子进程重新打开文件
...
}
if (ngx_change_binary) { // Nginx 二进制文件发生变化(用于在线更新Nginx,后文再聊)
...
}
if (ngx_noaccept) { // 通知子进程暂时不要接收客户端连接
...
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# ngx_start_worker_processes 函数
该函数用于孵化(spawn)子进程,注意:子进程将会复制 父进程中的数据,所以当 fork 返回后 父子进程的数据独立。而为了让子进程访问父进程的数据,所以需要在新的数据域中保存自己的数据,而不覆盖父进程的数据,如:ngx_processes 数组。而对于父子进程通讯来说,这里采用的是 unix_socket 来创建一对 socket fd 互相通讯。
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
ngx_int_t i;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");
for (i = 0; i < n; i++) { // 根据nginx.conf中的配置文件的 worker_processes 变量来创建子进程
ngx_spawn_process(cycle, ngx_worker_process_cycle,
(void *) (intptr_t) i, "worker process", type);
ngx_pass_open_channel(cycle); // 将新创建的 socket 信息传递给其他 worker 进程
}
}
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn){
u_long on;
ngx_pid_t pid;
ngx_int_t s;
if (respawn >= 0) { // 若传递参数大于等于0,表示直接在某个下标处创建,直接把 respawn 作为 下标
s = respawn;
} else {
for (s = 0; s < ngx_last_process; s++) { // 在ngx_processes数组中找到一个 pid为-1,也即没有子进程的下标,用于存放新的子进程的信息
if (ngx_processes[s].pid == -1) {
break;
}
}
if (s == NGX_MAX_PROCESSES) { // 超过最大进程 #define NGX_MAX_PROCESSES 1024
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"no more than %d processes can be spawned",
NGX_MAX_PROCESSES);
return NGX_INVALID_PID;
}
}
if (respawn != NGX_PROCESS_DETACHED) { // 非nginx 热更新(后文会描述),需要与master 建立 父子通道
// Solaris 9 OS 仍旧不存在 AF_LOCAL 标志,所以建立父子进程通道时,我们使用UNIX_SOCKET 来完成父子通讯,至于其原理,参考另一篇文章(注:socketpair 创建的一对 fd 将会放在 channel数组中)
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}
// 设置父子通道 fd 均为 非阻塞(为了能够在多路复用器中使用)
if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
ngx_nonblocking_n " failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
on = 1; // 启用标志位
// 将通道的第一个 fd 设置 可以在IO时产生异步信号(当新的IO事件发生时,发送 SIGIO 信号,通知发生了事件)
if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"ioctl(FIOASYNC) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 设置将要在该fd上接收 SIGIO 信号的进程 pid
if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(F_SETOWN) failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
// 设置当进程在执行新的程序(调用exec簇函数时,将该fd关闭),这里对父子进程都设置该标志位
if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
}
ngx_channel = ngx_processes[s].channel[1]; // 数组中第二个fd 保存到全局变量中
} else {
ngx_processes[s].channel[0] = -1;
ngx_processes[s].channel[1] = -1;
}
ngx_process_slot = s; // 保存子进程的数组下标(我们使用FORK调用,所以子进程将继承父进程的数据,所以这里提前保存)
pid = fork(); // 创建子进程:父进程 pid变量 为子进程的pid、子进程的 pid变量 为0
switch (pid) {
case -1: // 创建失败
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
case 0: // 子进程,保存父进程 pid (ngx_pid)到 ngx_parent,保存 自身pid 到 ngx_pid,然后执行传入的函数指针:ngx_worker_process_cycle
ngx_parent = ngx_pid;
ngx_pid = ngx_getpid();
proc(cycle, data);
break;
default:
break;
}
// 设置进程数组的 pid 和 退出标志位
ngx_processes[s].pid = pid;
ngx_processes[s].exited = 0;
if (respawn >= 0) {
return pid;
}
// 设置进程其他属性
ngx_processes[s].proc = proc;
ngx_processes[s].data = data;
ngx_processes[s].name = name;
ngx_processes[s].exiting = 0;
switch (respawn) { // 根据 respawn 变量对进程的特有属性 设置(我们这里是 NGX_PROCESS_RESPAWN)
case NGX_PROCESS_NORESPAWN:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_spawn = 0;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_JUST_SPAWN:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_spawn = 1;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_spawn = 0;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_JUST_RESPAWN:
ngx_processes[s].respawn = 1;
ngx_processes[s].just_spawn = 1;
ngx_processes[s].detached = 0;
break;
case NGX_PROCESS_DETACHED:
ngx_processes[s].respawn = 0;
ngx_processes[s].just_spawn = 0;
ngx_processes[s].detached = 1;
break;
}
if (s == ngx_last_process) { // 当前索引下标为最后一个下标,那么对 ngx_last_process + 1
ngx_last_process++;
}
return pid;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# ngx_pass_open_channel 函数
将新创建的 socket 信息传递给其他 worker 进程。我们看到该函数将 子进程的信息 包装到 ngx_channel_t结构,而该结构又嵌入到 msghdr 消息结构中,最后调用 sendmsg 发送过去。从源码中我没得到一个关键信息:每个进程自己的 channel [1] 留给自己使用,channel [0] 递交给其他进程与之通讯。
static void ngx_pass_open_channel(ngx_cycle_t *cycle)
{
ngx_int_t i;
ngx_channel_t ch; // 保存子进程的相关数据
ngx_memzero(&ch, sizeof(ngx_channel_t));
ch.command = NGX_CMD_OPEN_CHANNEL;
ch.pid = ngx_processes[ngx_process_slot].pid; // 子进程的进程数据下标
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0]; // socket 对中的第一个保存为通道对象 fd,也即其他子进程通过第一个 socket fd 与当前子进程通讯
for (i = 0; i < ngx_last_process; i++) { // 遍历所有子进程,找到 下标不为 ngx_process_slot 且有效子进程,将新创建的子进程的封装信息 ngx_channel_t 传递过去
if (i == ngx_process_slot
|| ngx_processes[i].pid == -1
|| ngx_processes[i].channel[0] == -1)
{
continue;
}
ngx_write_channel(ngx_processes[i].channel[0], // 与子进程通讯时,使用子进程的第一个 socket fd
&ch, sizeof(ngx_channel_t), cycle->log);
}
}
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log){
ssize_t n;
ngx_err_t err;
struct iovec iov[1];
struct msghdr msg;
if (ch->fd == -1) { // 子进程创建的通道fd 失败
msg.msg_accrights = NULL;
msg.msg_accrightslen = 0;
} else { // 将有效信息设置到 msg 中。这里为 通讯的 fd和 fd 长度
msg.msg_accrights = (caddr_t) &ch->fd;
msg.msg_accrightslen = sizeof(int);
}
// 将通道数据放入 io向量
iov[0].iov_base = (char *) ch;
iov[0].iov_len = size;
// io向量封装到消息结构中
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
n = sendmsg(s, &msg, 0); // 将消息传递到指定进程的通讯 fd 中
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44