# Redis AOF 原理
前面我们详细描述了RDB的原理:
- 每1毫秒回调serverCron函数
- 检测RDB临界值:server.saveparams
- fork 子进程,由子进程完成数据写入
本文将详细解释保证数据安全的aof机制,该机制将会在每次执行完用户的写操作后,将写入数据实时同步到磁盘,但是,很容易想到:性能相对于RDB较低。为了找到 aof 的实现原理,我们这里需要找到修改内存数据的操作,笔者这里以最简单的 set 命令举例,其他操作与此相同。
set 命令原理
该命令我们可以很轻易的从源代码中看到:
- 尝试编码字符串为长整型以节约空间
- 将 key - value 保存到db的 dict 表中,而对于 dict 而言,就是标准的hash表实现
- 将 server.dirty (opens new window) 自增,此时表明内存中的数据已经被修改
void setCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]); // 尝试对字符串进行编码以节约空间
setGenericCommand(c,0,c->argv[1],c->argv[2],NULL,0);
}
// 字符串编码为整形节约空间
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
if (o->encoding != REDIS_ENCODING_RAW) // 已经完成编码
return o;
if (o->refcount > 1) return o; // 不能对共享对象编码,共享对象应该在 object space 中保存
redisAssertWithInfo(NULL,o,o->type == REDIS_STRING); // 当前函数只支持字符串类型编码
if (!string2l(s,sdslen(s),&value)) return o; // 尝试对字符串编码为整形值
// 此时可以完成对字符串的编码,如果字符串中的整型值处于共享整型值中,那么我们直接返回共享对象(REDIS_SHARED_INTEGERS 10000)
if (server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS) {
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];
} else { // 否则我们将其编码为整形值保存
o->encoding = REDIS_ENCODING_INT;
sdsfree(o->ptr);
o->ptr = (void*) value;
return o;
}
}
// 尝试将字符串转为长整形值保存
int string2l(const char *s, size_t slen, long *lval) {
long long llval;
if (!string2ll(s,slen,&llval)) // 尝试转为 long long 类型
return 0;
// 超出界限
if (llval < LONG_MIN || llval > LONG_MAX)
return 0;
*lval = (long)llval; // 否则直接强转
return 1;
}
// 将编码后的字符串:key - value 保存
void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire, int unit) {
long long milliseconds = 0;
if (expire) { // 指定过期时间,那么计算过期时间
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK) // 从传递参数中获取指定的过期时间
return;
if (milliseconds <= 0) { // 无效过期时间
addReplyError(c,"invalid expire time in SETEX");
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
if (lookupKeyWrite(c->db,key) != NULL && nx) { // 如果当前key已经存在 同时为 setnx 命令,那么直接返回,因为setnx 不会覆盖原值
addReply(c,shared.czero);
return;
}
// 保存 key - value,并增加server.dirty值,表示内存值的修改次数
setKey(c->db,key,val);
server.dirty++;
if (expire) setExpire(c->db,key,mstime()+milliseconds); // 设置过期时间
addReply(c, nx ? shared.cone : shared.ok); // 响应客户端
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
处理流程分析
我们看到 set 命令 仅仅只是更改了 server.dirty (opens new window) 计数,本身并没有参与 aof 的操作,那么谁来执行aof操作呢?我们得回到前面描述的处理流程中(可别忘了前面我们描述的 ae 事件循环原理哦,这里我们省略掉无关代码)。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
...
acceptCommonHandler(cfd);
}
static void acceptCommonHandler(int fd) {
...
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(fd);
return;
}
...
}
redisClient *createClient(int fd) {
...
if (fd != -1) {
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR) // 可以看到当客户端发送数据后,回调该函数
{
close(fd);
zfree(c);
return NULL;
}
}
...
}
// 处理客户端发送指令
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
...
processInputBuffer(c);
...
}
void processInputBuffer(redisClient *c) {
...
if (processCommand(c) == REDIS_OK)
resetClient(c);
...
}
// 实际执行操作
int processCommand(redisClient *c) {
...
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 从命令表中获取指令函数指针
...
call(c,REDIS_CALL_FULL); // 调用命令,注意这里的 REDIS_CALL_FULL(描述: #define REDIS_CALL_FULL (REDIS_CALL_SLOWLOG | REDIS_CALL_STATS | REDIS_CALL_PROPAGATE) 注意这里包含了 REDIS_CALL_PROPAGATE )
}
void call(redisClient *c, int flags) {
...
c->cmd->proc(c); // 调用 函数指针 完成处理,这里就是我们上面描述的 set 命令
...
if (flags & REDIS_CALL_PROPAGATE) { // 执行标志位存在调用传播,那么传播
int flags = REDIS_PROPAGATE_NONE;
if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
flags |= REDIS_PROPAGATE_REPL; // 将修改传播给副本集
if (dirty) // 存在内存数据修改,那么增加 REDIS_PROPAGATE_AOF 传播
flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
propagate 函数
最终,我们看到在执行完set命令后,将会根据 flag 来调用propagate 函数,而我们制定了 call 函数的 flag 总是为 REDIS_CALL_FULL,其中包含了 REDIS_CALL_PROPAGATE 标志位,所以我们将关注点放到该函数即可。
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) // 当前没有关闭AOF机制,同时需要传播到AOF中,那么执行 feedAppendOnlyFile 函数完成AOF处理
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
feedAppendOnlyFile 函数
该函数将完成实际 AOF 操作。我们看到该函数将通过命令构建 sds buf ,并将该缓冲区数据添加到 aof_buf 中。源码如下。
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
...
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
// 过期命令我们将其转为 PEXPIREAT 命令方便在存储到文件中时保持精度,并且保存的时间总是绝对时间,而不是相对时间
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { // SETEX/PSETEX 命令也需要转为 SET 和 PEXPIREAT 指令
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else { // 其他命令直接转为对应的append数据即可,我们的 set 命令便是如此
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
// 如果我们开启了AOF机制,那么我们将数据添加到 aof_buf 缓冲区中
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
// 如果后台AOF重写正在进行中,我们希望在缓冲区中保存子进程和当前文件之间的差异,这样当子进程完成它的工作时,我们可以将两者之间差异值保存到新的aof件中
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
那么,谁来处理 aof_buf 中的数据呢?还记得 beforeSleep 函数和 aeMain 函数么?很容易关联了吧?所以之前学到的东西并没有多余的,此时又再一次用到了,有时候你认为不重要的东西,到最后总会看到(pass:不学 点,怎么连成线?)
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
void beforeSleep(struct aeEventLoop *eventLoop) {
...
flushAppendOnlyFile(0); // 将aof 缓存区中的数据写入磁盘,参数 0 表示在后台线程中异步 fsync 将page cache 中的数据写入磁盘持久化
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
flushAppendOnlyFile 函数
那么我们直接看看该函数如何操作缓冲区即可。我们看到处理流程分为:
- 异步落盘(由后台线程完成处理)
- 同步落盘(主线程完成:write函数、fsync 函数)
// 控制 fsync 函数的调用时间(该函数用于将OS中文件的缓存实时与磁盘同步,保证数据落盘)
#define AOF_FSYNC_NO 0
#define AOF_FSYNC_ALWAYS 1
#define AOF_FSYNC_EVERYSEC 2
void flushAppendOnlyFile(int force) {
...
if (server.aof_fsync == AOF_FSYNC_EVERYSEC) // 指定每秒落盘一次
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; // 检查后台线程是否正在处理 REDIS_BIO_AOF_FSYNC 同步
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { // 指定 force 为 0 时,异步落盘
if (sync_in_progress) { // 存在未完成的异步落盘任务(也即后台执行线程仍旧在执行fsync函数,但是由于磁盘执行缓慢,存在任务还未处理),那么我们设置 aof_flush_postponed_start ,该函数将会在serverCron 函数中调用,相当于延迟一会儿再次调用该函数
if (server.aof_flush_postponed_start == 0) {
server.aof_flush_postponed_start = server.unixtime; // 设置推迟执行时间
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) { // 等待2秒钟以内,那么再次尝试异步落盘
return;
}
// 否则增加延迟fsync计数,因为可能磁盘写入过于缓慢,那么需要打印报警信息
server.aof_delayed_fsync++;
redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
server.aof_flush_postponed_start = 0; // 重置推迟时间
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); // 执行写入操作
if (nwritten != (signed)sdslen(server.aof_buf)) { // 写入失败,那么打印报警信息
if (nwritten == -1) { // 未完成任何写入
redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
} else { // 写入了部分信息,但是并没有完全写入
redisLog(REDIS_WARNING,"Exiting on short write while writing to "
"the append-only file: %s (nwritten=%ld, "
"expected=%ld)",
strerror(errno),
(long)nwritten,
(long)sdslen(server.aof_buf));
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { // 尝试清除文件中写入的部分信息,失败后报警
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
}
exit(1); // 退出redis进程
}
server.aof_current_size += nwritten; // 否则写入成功,那么aof计数信息
// 当缓冲区较小时,可以复用该缓冲区,否则我们分配一个空的缓冲区(内存与性能的平衡)
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
// 如果no-appendfsync-on-rewrite 参数设置为 1,并且有子进程在后台进行I/O操作,则不要进行fsync
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
// 执行fsync(注意:write 函数仅仅只是将数据写入内核buffer中,并没有同步到磁盘,调用该函数后,数据才会完全落盘)
if (server.aof_fsync == AOF_FSYNC_ALWAYS) { // 总是执行fsync
aof_fsync(server.aof_fd); // 调用fsync完成落盘
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) { // 否则将fsync的操作交由 后台 线程完成处理
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}
// 在linux中,使用fdatasync,只同步数据,而不同步fd的元数据,比如:修改时间、用户信息等等,用于优化写入速度
#ifdef __linux__
#define aof_fsync fdatasync
#else
#define aof_fsync fsync
#endif
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
AOF重写机制
因为AOF是通过保存被执行的写命令来记录数据库状态的,所以随着服务器的运行时间久,AOF的文件会变得越来越大,不仅占用系统资源,而且当通过AOF文件来进行数据还原时花费的额时间也会更久。为了解决AOF文件体积膨胀的问题,Redis提供了AOF文件重写功能(注意:新旧两个 aof 文件所保存的数据库状态是相同的,但是新的AOF文件不会包含任何浪费空间的冗余命令,通常体积会较旧AOF文件小很多。同时,AOF重写其实是一个有歧义的名字,实际上重写工作是针对当前最新 redis 数据库状态来进行的,重写过程中不会读写、也不适用原来的AOF文件):
- AOF文件重写功能是通过子进程来执行
- 不过子进程在进行AOF重写期间,服务器进程还会继续处理命令请求,而新的命令可能会对现有的数据库状态进行修改,从而导致服务器当前的数据库状态和重写后的AOF文件所保存的数据库状态不一致
- 为了解决这一个问题,Redis服务器设置了一个AOF重写缓冲区,这个缓冲区会在服务器进程创建子进程时使用,当redis服务器执行完一个写命令后,会把这个写命令发送给AOF缓冲区和AOF重写缓冲区(也即上述 RewriteBuffer)