# Redis 深度分析系列二
Redis 深度分析系列一中我们了解到了Redis 使用 ANSI C标准编写而出,没有其他外部依赖项,同时我们知道C语言中,main函数作为入口函数,Redis的初始化和启动肯定也在其中,本节我们来从main函数开始来看看Redis在初始化和启动时完成的操作,同时找到接受并处理客户端请求的入口。
main 函数
通过源码我们得知如下流程信息:
- Redis在启动时,将会初始化服务器响应的配置信息,这些配置信息我们在后面用到时再详细讲解
- 解析参数并根据客户端参数加载并解析配置文件,然后进一步初始化Redis Server的变量信息
- 检测内核的内存分配策略参数
- 检测TCP FD与Unix FD
- 设置事件循环eventloop并启动主循环处理程序
- 结束后删除事件循环结构
int main(int argc, char **argv) {
struct timeval tv;
zmalloc_enable_thread_safeness(); // 设置变量 static int zmalloc_thread_safe = 1;
zmalloc_set_oom_handler(redisOutOfMemoryHandler); // 设置发生OOM时回调的函数
srand(time(NULL)^getpid()); // 使用当前进程的ID(getpid())与当前时间一起初始化随机数种子
gettimeofday(&tv,NULL); // 获取当前时间信息,放入timeval结构体
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); // 使用当前时间的秒信息(tv_sec)与毫秒信息(tv_usec) 与进程id 设置变量 static uint32_t dict_hash_function_seed = 5381,在hash函数中是用来散列key-value
server.sentinel_mode = checkForSentinelMode(argc,argv); // 从参数中解析,看看是否以哨兵模式启动,这里我们忽略
initServerConfig(); // 初始化Redis 服务器的相关配置信息
if (server.sentinel_mode) { // 初始化哨兵模式(忽略)
initSentinelConfig();
initSentinel();
}
if (argc >= 2) { // 解析参数
int j = 1;
sds options = sdsempty(); // 分配一个redis 字符串来保存参数(后面会详细解释sds,现在只需要知道它是一个字符串即可)
char *configfile = NULL;
// 根据参数值完成相应处理
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
if (argv[j][0] != '-' || argv[j][1] != '-')
configfile = argv[j++];
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') {
// 解析参数名
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
// 解析参数值
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
resetServerSaveParams(); // 重置服务器参数:server.saveparams = NULL
loadServerConfig(configfile,options); // 根据参数中指定的配置文件和参数初始化服务器配置
sdsfree(options); // 启动参数主要用于初始化服务器配置,使用完毕后释放其占用空间
} else { // 没有指定配置文件,那么打印警告日志
redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}
// 如果指定Redis 作为后台进程运行,那么调用daemonize fork出另外的进程在后台执行
if (server.daemonize) daemonize();
initServer(); // 完成Redis Server的剩余变量初始化
if (server.daemonize) createPidFile(); // 若Redis在后台执行,那么创建pid文件,告知当前正在后台执行的Redis pid信息
redisAsciiArt(); // 打印Redis ascii编码的图形(忽略)
if (!server.sentinel_mode) { // 执行只有在不运行在哨兵模式时才需要的动作
redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
// 如果在Linux平台下,指定了 OvercommitMemory 为 0,那么打印警告日志(注:overcommit_memory=0, 表示内核将检查是否有足够的可用内存供应用进程使用,如果有足够 的可用内存,内存申请允许,否则,内存申请失败,并把错误返回给应用进程。 overcommit_memory=1, 表示内核允许分配所有的物理内存,而不管当前的内存状态如何。 overcommit_memory=2, 表示内核允许分配超过所有物理内存和swap空间总和的内存)
#ifdef __linux__
linuxOvercommitMemoryWarning();
#endif
loadDataFromDisk(); // 从磁盘中加载之前保存的RDB或者AOF文件
if (server.ipfd > 0) // 正确设置TCP socket FD 文件描述符,表明当前可以正确接收来自客户端的TCP连接
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
if (server.sofd > 0) // 正确设置Unix socket FD 文件描述符,表明当前可以正确接收来自本机进程的Unix套接字连接(什么是Unix套接字?一种本地进程通讯的方式,不经过协议栈,但可以使用Socket来完成通讯)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
}
// 检查用户设置的内存是否足够,需要大于1M
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
aeSetBeforeSleepProc(server.el,beforeSleep); // 设置执行睡眠前的回调函数:eventLoop->beforesleep = beforesleep;
aeMain(server.el); // 执行主事件循环,处理准备好的事件
aeDeleteEventLoop(server.el); // 退出循环,表明当前需要关闭Redis,那么关闭打开的事件循环
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
initServerConfig 函数
该函数用于初始化全局结构redisServer的变量,读者这里只需要观察初始化了哪些变量即可,不需要关注初始化的细节,因为这里面的值都是写死的默认值,对于可配置的变量将可以通过后面解析config文件的变量进行覆盖,当我们不需要使用这些变量时,不需要了解其意思,在后面函数中用到对应参数时,再详细讲解。源码如下。
// Redis 服务器全局结构,其中的变量,我们在用到时在分析,这里只需要了解拥有该结构即可
struct redisServer {
};
struct redisServer server;
void initServerConfig() {
... // 初始化普通变量
/* 初始化副本集配置 */
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.repl_state = REDIS_REPL_NONE;
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = 1;
server.repl_slave_ro = 1;
server.repl_down_since = time(NULL);
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
/* 初始化客户端输出缓冲区限制值 */
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 1024*1024*256;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*64;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*32;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*8;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60;
/* 初始化浮点值计算变量 */
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
R_NegInf = -1.0/R_Zero;
R_Nan = R_Zero/R_Zero;
/* 初始化命令行参数表(采用hash表结构来存储) */
server.commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
/* 初始化慢日志参数 */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN;
/* 初始化调试参数 */
server.assert_failed = "<no assertion failed>";
server.assert_file = "<no file>";
server.assert_line = 0;
server.bug_report_start = 0;
server.watchdog_period = 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
initServer 函数
该函数用于初始化RedisServer结构的剩余参数。该函数将会把Redis的动态信息进行初始化:
- 信号处理相关
- 全局结构
- 处理客户端连接的TCP端口、Unix端口和对应的处理函数:acceptTcpHandler、acceptUnixHandler(至此:我们看到客户端的输入事件将会在这两个函数处理,而该两个函数如何注册到事件循环中,以及事件循环如何接收客户端连接,我们下一篇详细讲解,这里了解即可,因为我们的目的是观察Redis的启动流程,以及涉及到的全局结构)
- Redis 数据库
- LUA 执行脚本
- 慢查询日志
- 后台执行线程
void initServer() {
int j;
signal(SIGHUP, SIG_IGN); // 屏蔽SIGHUP信号,SIG_IGN表示 SIG_IGNORE 忽略该信号(该信号讲在控制进程或者终端退出时发出。手册描述:Hangup detected on controlling terminal or death of controlling process)
signal(SIGPIPE, SIG_IGN);// 屏蔽SIGPIPE信号(该信号: 在写入pipe管道时,没有进程读取数据时发出。手册描述:Broken pipe: write to pipe with no readers)
setupSignalHandlers(); // 设置其他信号的处理函数
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
createSharedObjects(); // 创建redis共享对象:struct sharedObjectsStruct {},节省每次在使用共享对象时都需要分配和释放内存导致的性能损耗,考虑下常量池的设计
adjustOpenFilesLimit();// 调整redis 打开文件描述符的限制
server.el = aeCreateEventLoop(server.maxclients+1024); // 创建事件循环线程
server.db = zmalloc(sizeof(redisDb)*server.dbnum); // 创建redis 数据库对象:typedef struct redisDb {} redisDb
// 初始化 TCP 端口
if (server.port != 0) {
server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
if (server.ipfd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening port %d: %s",
server.port, server.neterr);
exit(1);
}
}
// 初始化Unix 端口
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);
if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1);
}
}
// 检测端口是否有效
if (server.ipfd < 0 && server.sofd < 0) {
redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
// 创建Redis 数据库
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
// 初始化其他参数
...
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); // 创建时间事件函数 :serverCron
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); // 创建处理TCP客户端连接事件处理函数:acceptTcpHandler
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); // 创建处理unix客户端连接处理函数:acceptUnixHandler
if (server.aof_state == REDIS_AOF_ON) { // 打开AOF持久化文件
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
scriptingInit(); // 初始化LUA 脚本执行环境
slowlogInit(); // 初始化慢查询日志模块
bioInit(); // 创建后台执行线程
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
setupSignalHandlers 函数
该函数主要用于设置进程信号处理函数。响应描述如下。该函数不重要,如果读者没有OS信号处理机制,较难理解它干了什么,这里只需要知道信号对应处理函数即可,等后面了解了进程如何处理信号时,将手到擒来。源码如下。
static void sigtermHandler(int sig) { // 接收到终止信号时调用,打印日志
REDIS_NOTUSED(sig);
redisLogFromHandler(REDIS_WARNING,"Received SIGTERM, scheduling shutdown...");
server.shutdown_asap = 1;
}
void setupSignalHandlers(void) {
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = sigtermHandler;
sigaction(SIGTERM, &act, NULL); // 设置SIGTERM信号处理函数(接收终止信号时发出该信号)
#ifdef HAVE_BACKTRACE // 使用backtrace函数时设置处理函数为:sigsegvHandler
sigemptyset(&act.sa_mask);
act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
act.sa_sigaction = sigsegvHandler;
sigaction(SIGSEGV, &act, NULL);
sigaction(SIGBUS, &act, NULL);
sigaction(SIGFPE, &act, NULL);
sigaction(SIGILL, &act, NULL);
#endif
return;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
createSharedObjects 函数
该函数较为简单,就是常量池的设计,将公用的、不变的数据初始化,后面用到时不需要分配和释放。这些公用的对象较多为字符串对象,对于Redis的对象表示和字符串表示这里先暂时不需要管,我们主要看主流程,对于对象的表示将放到下一篇描述。源码如下。
void createSharedObjects(void) {
int j;
shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
shared.cnegone = createObject(REDIS_STRING,sdsnew(":-1\r\n"));
shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
"-ERR Operation against a key holding the wrong kind of value\r\n"));
shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
"-ERR no such key\r\n"));
shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
"-ERR syntax error\r\n"));
shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
"-ERR source and destination objects are the same\r\n"));
shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
"-ERR index out of range\r\n"));
shared.noscripterr = createObject(REDIS_STRING,sdsnew(
"-NOSCRIPT No matching script. Please use EVAL.\r\n"));
shared.loadingerr = createObject(REDIS_STRING,sdsnew(
"-LOADING Redis is loading the dataset in memory\r\n"));
shared.slowscripterr = createObject(REDIS_STRING,sdsnew(
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
shared.masterdownerr = createObject(REDIS_STRING,sdsnew(
"-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n"));
shared.bgsaveerr = createObject(REDIS_STRING,sdsnew(
"-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n"));
shared.roslaveerr = createObject(REDIS_STRING,sdsnew(
"-READONLY You can't write against a read only slave.\r\n"));
shared.oomerr = createObject(REDIS_STRING,sdsnew(
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
for (j = 0; j < REDIS_SHARED_SELECT_CMDS; j++) {
shared.select[j] = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"select %d\r\n", j));
}
shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
shared.del = createStringObject("DEL",3);
shared.rpop = createStringObject("RPOP",4);
shared.lpop = createStringObject("LPOP",4);
shared.lpush = createStringObject("LPUSH",5);
for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
shared.integers[j]->encoding = REDIS_ENCODING_INT;
}
for (j = 0; j < REDIS_SHARED_BULKHDR_LEN; j++) {
shared.mbulkhdr[j] = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"*%d\r\n",j));
shared.bulkhdr[j] = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),"$%d\r\n",j));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
adjustOpenFilesLimit 函数
我们知道Linux操作系统将会在rlimit中限制进程打开的文件数,该函数将尝试根据配置的最大客户端数来提高打开文件的最大数量。 它还将包含32个额外的文件描述符,因为Redis还需要一些文件描述符用于持久性、侦听套接字、日志文件等。 如果不能将限制相应地设置为配置的最大客户端数量,该函数将执行保留当前服务器的设置。源码如下。
void adjustOpenFilesLimit(void) {
rlim_t maxfiles = server.maxclients+32; // 增加描述符数量
struct rlimit limit;
if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { // 获取当前系统的描述符信息
redisLog(REDIS_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
strerror(errno));
server.maxclients = 1024-32;
} else { // 尝试修改当前进程的FD配置
rlim_t oldlimit = limit.rlim_cur;
if (oldlimit < maxfiles) {
rlim_t f;
f = maxfiles;
while(f > oldlimit) {
limit.rlim_cur = f;
limit.rlim_max = f;
if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
f -= 128;
}
if (f < oldlimit) f = oldlimit;
if (f != maxfiles) {
server.maxclients = f-32;
redisLog(REDIS_WARNING,"Unable to set the max number of files limit to %d (%s), setting the max clients configuration to %d.",
(int) maxfiles, strerror(errno), (int) server.maxclients);
} else {
redisLog(REDIS_NOTICE,"Max number of open files set to %d",
(int) maxfiles);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
slowlogInit 函数
该函数用于初始化慢查询日志,可以看到这里将使用list列表来完成日志的存储。同样,对于列表对象的组织,我们在下一篇进行讲解。
void slowlogInit(void) {
server.slowlog = listCreate();
server.slowlog_entry_id = 0;
listSetFreeMethod(server.slowlog,slowlogFreeEntry);
}
2
3
4
5
6
7
8
9
bioInit 函数
该函数用于创建后台任务执行线程,可以看到所谓Redis是单线程,指的仅仅是响应客户端的线程为单线程,并不代表Redis进程本身只有一个线程。该方法将利用pthread 线程库完成相应处理,读者了解pthread线程编程技术将更容易理解,不理解也没关系,跟着笔者的注释即可~。相关描述如下。
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* 初始化线程互斥锁和条件变量,也即:控制线程互斥的结构 */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
bio_jobs[j] = listCreate(); // 初始化每个线程的任务队列列表
bio_pending[j] = 0;
}
/* 初始化线程栈大小 */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1;
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* 创建线程,参数为:当前线程的序号,函数执行函数为 bioProcessBackgroundJobs */
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
}
}
// 每个线程执行的函数
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg; // 该值为线程序号,上面看到过,可用于获取当前线程的互斥锁信息
sigset_t sigset;
pthread_detach(pthread_self());
pthread_mutex_lock(&bio_mutex[type]); // 获取当前线程自己的互斥锁
// 设置线程处理信号
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
redisLog(REDIS_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
// 循环处理任务列表中放入的任务
while(1) {
listNode *ln;
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
continue;
}
ln = listFirst(bio_jobs[type]);
job = ln->value;
pthread_mutex_unlock(&bio_mutex[type]);
if (type == REDIS_BIO_CLOSE_FILE) { // 当前线程类型为执行BIO关闭文件操作
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {// 当前线程类型为执行AOF操作
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
aeSetBeforeSleepProc 函数
该函数较为简单,仅仅保留一个函数指针而已。在事件循环线程执行sleep操作之前回调。这里的回调实现较为简单,仅仅处理未处理的命令并写入AOF数据,因为每次执行后,如果配置了AOF,那么需要持久化到磁盘。
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}
// 回调函数实现
void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
listNode *ln;
redisClient *c;
/* 尝试为刚刚解除阻塞的客户端处理未执行的命令 */
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
redisAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
c->flags &= ~REDIS_UNBLOCKED;
/* 处理客户端输入缓冲区中的剩余数据 */
if (c->querybuf && sdslen(c->querybuf) > 0) {
server.current_client = c;
processInputBuffer(c);
server.current_client = NULL;
}
}
/* 将AOF缓冲区的数据写入磁盘 */
flushAppendOnlyFile(0);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
aeMain 函数
该函数将有主线程完成调用,接收事件循环的事件完成相应处理。我们看到aeProcessEvents(eventLoop, AE_ALL_EVENTS)为主执行入口,那么我们下一篇将从该点开始,看看Redis的事件循环原理和具体处理的细节。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) { // 当没有显示标志停止时不断循环处理事件循环中的线程
if (eventLoop->beforesleep != NULL) // 在执行处理事件时回调beforesleep,因为操作底层IO事件时可能导致线程阻塞
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS); // 完成实际事件处理,可以看到这里将处理所有类型的事件(AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) #define AE_FILE_EVENTS 1 #define AE_TIME_EVENTS 2 ,事件类型分为:文件fd的事件、时间超时事件)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
aeDeleteEventLoop 函数
该函数将在主线程完成处理后退出,然后释放事件循环结构。
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop); // 处理实际事件循环fd的处理,比如:关闭epll的fd
// 释放结构占用空间
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
2
3
4
5
6
7
8
9
10
11
12
13