# Redis RDB 复制原理

我们知道Redis 是一个内存数据库,所以我们需要做的就是将内存中的数据dump到磁盘即可,这通常有两种方式:

  1. 在dump时,redis不可用
  2. 在dump时,redis可用。这时,就需要保证读写互斥,那么为了保证在开始dump数据时的快照,那么就需要将当前进程dump时刻的数据形成一个快照,这件事如果由Redis来实现的话,有些难,同时也会导致实现复杂,那么很明显我们通过OS提供的fork机制来实现即可

在类Unix中,fork系统调用,通常用于创建进程,新创建的进程为当前进程的子进程,同时父子进程的数据相互隔离,所以很明显的满足了我们的需求:保存快照并dump到磁盘。

还记得如下代码么?在Redis初始化时放入事件循环中的时间事件。我们很容易想到:RDB的机制肯定具备某种触发条件,而这个条件往往与时间相关,所以我们直接将关注点放到该函数serverCron中,同时我们可以看到Redis将每1毫秒执行一次该函数,当然,由于Redis将会在处理完用户请求后才会触发该函数,所以,将会大于1毫秒。

void initServer() {

  aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

}
1
2
3
4
5

serverCron 函数

那么,我们直接将目光放到serverCron函数即可。我们看到在Redis初始化时将会设置rdb参数,将设置三个阈值:

  1. 1 小时以内发生至少 1 次内存数据修改
  2. 5 分钟以内发生至少 100 次内存数据修改
  3. 1 分钟以内发生至少 10000 次内存数据修改

这些数值如何带来的?考虑下:内存中的数据与磁盘中的数据的异同个数,如果太少,那么没必要rdb浪费性能,如果太多,那么尽早完成rdb,减少数据丢失量。

void initServerConfig() {

  appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */

  appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */

  appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */

}void appendServerSaveParams(time_t seconds, int changes) {

  server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));

  server.saveparams[server.saveparamslen].seconds = seconds;

  server.saveparams[server.saveparamslen].changes = changes;

  server.saveparamslen++;

}int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

  // 传入参数都不使用

  REDIS_NOTUSED(eventLoop);

  REDIS_NOTUSED(id);

  REDIS_NOTUSED(clientData);

 ...

  for (j = 0; j < server.saveparamslen; j++) { // 遍历rdb参数并检测,是否触发RDB

    struct saveparam *sp = server.saveparams+j; // 获取当前参数

    if (server.dirty >= sp->changes && // 当前修改数据量大于设置的触发阈值

      server.unixtime-server.lastsave > sp->seconds) { // 当前时间到达设置的触发阈值

      rdbSaveBackground(server.rdb_filename); // 执行RDB

      break;

   }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

rdbSaveBackground 函数

该函数将会调用fork系统调用,完成子进程的创建,同时通过 if else 改变父子进程的执行路线:子进程完成RDB,父进程继续相应客户端事件。

int rdbSaveBackground(char *filename) {

  pid_t childpid;

  long long start;

  if (server.rdb_child_pid != -1) return REDIS_ERR; // 当前已经存在RDB进程,不允许同时存在多个RDB进程

  server.dirty_before_bgsave = server.dirty; // 记录RDB前脏数据个数

  start = ustime(); // 记录开始时间

  if ((childpid = fork()) == 0) { // 调用fork 调用,子进程将返回 0 

    int retval;

    // 子进程不需要fd,因为只需要完成dump操作

    if (server.ipfd > 0) close(server.ipfd);

    if (server.sofd > 0) close(server.sofd);

    retval = rdbSave(filename); // 执行保存

    exitFromChild((retval == REDIS_OK) ? 0 : 1); // 退出子进程

 } else {

    // 父进程代码,将记录状态返回继续处理客户都安事件

    server.stat_fork_time = ustime()-start;

    if (childpid == -1) {

      redisLog(REDIS_WARNING,"Can't save in background: fork: %s",

          strerror(errno));

      return REDIS_ERR;

   }

    redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);

    server.rdb_save_time_start = time(NULL);

    server.rdb_child_pid = childpid;

    updateDictResizePolicy();

    return REDIS_OK;

 }

  return REDIS_OK; 

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

rdbSave 函数

该函数将完成实际RDB过程,filename为rdb 存储文件名。可以看到这里将会打开临时rdb文件,然后将db中的数据写入到文件中,然后重命名临时rdb文件为filename,然后关闭资源。源码如下。

int rdbSave(char *filename) { 

  dictIterator *di = NULL;

  dictEntry *de;

  char tmpfile[256];

  char magic[10];

  int j;

  long long now = mstime();

  FILE *fp;

  rio rdb;

  uint64_t cksum;

  snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); // 将格式化字符串写入到tmpfile数组中

  fp = fopen(tmpfile,"w"); // 以写文件打开rdb文件

  if (!fp) {

    redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",

        strerror(errno));

    return REDIS_ERR;

 }

  rioInitWithFile(&rdb,fp); // 初始化

  if (server.rdb_checksum) // 写入校验号

    rdb.update_cksum = rioGenericUpdateChecksum;

  // 将版本信息写入到文件中

  snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); 

  if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;for (j = 0; j < server.dbnum; j++) { // 遍历redis db,通常我们只有一个db。将其中的数据写入文件中

    redisDb *db = server.db+j;

    dict *d = db->dict;

    if (dictSize(d) == 0) continue;

    di = dictGetSafeIterator(d);

    if (!di) {

      fclose(fp);

      return REDIS_ERR;

   }

    // 写入所属db信息

    if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;

    if (rdbSaveLen(&rdb,j) == -1) goto werr;

    // 迭代db中的信息,获取所有的 key - value 写入文件中

    while((de = dictNext(di)) != NULL) { // 这里的写法参考下java的map中的entryset,混沌学堂的学员务必考虑下二级指针如何实现map

      sds keystr = dictGetKey(de);

      robj key, *o = dictGetVal(de);

      long long expire;

      initStaticStringObject(key,keystr); // 将key 转为 redis 的 String 类型的 robj

      expire = getExpire(db,&key); // 获取过期时间

      if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;

   }

    dictReleaseIterator(di); // 释放迭代器

 }

  di = NULL; 

  /* 写入结束号 */

  if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;

  // 写入CRC64校验号,保证文件数据正确性

  cksum = rdb.cksum;

  memrev64ifbe(&cksum);

  rioWrite(&rdb,&cksum,8);

  // 保证数据落盘:刷新写缓冲区、将page cache 数据写入磁盘、关闭fd

  fflush(fp); 

  fsync(fileno(fp));

  fclose(fp);

  // 将临时文件名修改为正确文件名

  if (rename(tmpfile,filename) == -1) {

    redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));

    unlink(tmpfile);

    return REDIS_ERR;

 }

  // 重置所有数据,并关闭资源

  redisLog(REDIS_NOTICE,"DB saved on disk");

  server.dirty = 0;

  server.lastsave = time(NULL);

  server.lastbgsave_status = REDIS_OK;

  return REDIS_OK;

  werr:

  fclose(fp);

  unlink(tmpfile);

  redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));

  if (di) dictReleaseIterator(di);

  return REDIS_ERR;

}// 初始化 rio *r

static const rio rioFileIO = {

  rioFileRead, // 指定rio读操作

  rioFileWrite, // 指定rio写操作

  rioFileTell, // 指定获取文件读写position位置的操作

  NULL,     

  0,     

 { { NULL, 0 } }

};// 三个函数,均是直接调用 系统函数库 中的函数,非常简单~ 不懂的话 man 一下就行

static inline size_t rioWrite(rio *r, const void *buf, size_t len) {

  if (r->update_cksum) r->update_cksum(r,buf,len);

  return r->write(r,buf,len);

}static inline size_t rioRead(rio *r, void *buf, size_t len) {

  if (r->read(r,buf,len) == 1) {

    if (r->update_cksum) r->update_cksum(r,buf,len);

    return 1;

 }

  return 0;

}static inline off_t rioTell(rio *r) {

  return r->tell(r);

}void rioInitWithFile(rio *r, FILE *fp) {

  *r = rioFileIO;

  r->io.file.fp = fp;

}// 将指针p处的数据写入文件,len指明写入长度

static int rdbWriteRaw(rio *rdb, void *p, size_t len) {

  if (rdb && rioWrite(rdb,p,len) == 0) // 直接调用rdb结构中的 rioFileWrite 函数

    return -1;

  return len;

}// 该宏定义用于初始化在栈上分配的String类型的robj

#define initStaticStringObject(_var,_ptr) do { \

  _var.refcount = 1; \

  _var.type = REDIS_STRING; \

  _var.encoding = REDIS_ENCODING_RAW; \

  _var.ptr = _ptr; \

} while(0);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

相关写入函数

本节将罗列并注释相关写入函数。rdbSaveKeyValuePair 为主入口函数,保存传入的key - value 、过期时间、类型信息,并将它们记录到rdb中,这些函数较为简单:根据类型遍历写入(读者注意其中为了节约内存使用的压缩列表和整形编码字符串、int 编码的 set 表)。源码如下。

// 将 key - value 、过期时间、类型 写入文件

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,

            long long expiretime, long long now)

{

  // 存在过期时间,那么保存国企时间即可

  if (expiretime != -1) {

    // 如果key 已经过期,那么直接忽略

    if (expiretime < now) return 0;

    if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;

    if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;

 }

  // 保存 type, key, value

  if (rdbSaveObjectType(rdb,val) == -1) return -1;

  if (rdbSaveStringObject(rdb,key) == -1) return -1;

  if (rdbSaveObject(rdb,val) == -1) return -1;

  return 1;

}// 保存robj的类型,根据类型写入1字节长度的类型值

int rdbSaveObjectType(rio *rdb, robj *o) {

  switch (o->type) {

    case REDIS_STRING: // 写入字符串

      return rdbSaveType(rdb,REDIS_RDB_TYPE_STRING);

    case REDIS_LIST: // 写入列表

      if (o->encoding == REDIS_ENCODING_ZIPLIST)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST_ZIPLIST);

      else if (o->encoding == REDIS_ENCODING_LINKEDLIST)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST);

      else

        redisPanic("Unknown list encoding");

    case REDIS_SET: // 写入集合

      if (o->encoding == REDIS_ENCODING_INTSET)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_SET_INTSET);

      else if (o->encoding == REDIS_ENCODING_HT)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_SET);

      else

        redisPanic("Unknown set encoding");

    case REDIS_ZSET: // 写入排序集合

      if (o->encoding == REDIS_ENCODING_ZIPLIST)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET_ZIPLIST);

      else if (o->encoding == REDIS_ENCODING_SKIPLIST)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET);

      else

        redisPanic("Unknown sorted set encoding");

    case REDIS_HASH: // 写入hash

      if (o->encoding == REDIS_ENCODING_ZIPLIST)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPLIST);

      else if (o->encoding == REDIS_ENCODING_HT)

        return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH);

      else

        redisPanic("Unknown hash encoding");

    default:

      redisPanic("Unknown object type");

 }

  return -1;

}// 类型写入只需要1个字节长度即可

int rdbSaveType(rio *rdb, unsigned char type) {

  return rdbWriteRaw(rdb,&type,1);

}// 写入字符串对象

int rdbSaveStringObject(rio *rdb, robj *obj) {

  if (obj->encoding == REDIS_ENCODING_INT) { // 整形编码

    return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);

 } else { // 正常编码,直接按照C语言的字符串写入即可

    redisAssertWithInfo(NULL,obj,obj->encoding == REDIS_ENCODING_RAW);

    return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));

 }

}// 将整形值直接写入或者转为字符串再写入

int rdbSaveLongLongAsStringObject(rio *rdb, long long value) {

  unsigned char buf[32];

  int n, nwritten = 0;

  int enclen = rdbEncodeInteger(value,buf);

  if (enclen > 0) { // 整形编码直接写入 buf 即可

    return rdbWriteRaw(rdb,buf,enclen);

 } else { // 否则编码为字符串

    enclen = ll2string((char*)buf,32,value);

    redisAssert(enclen < 32);

    if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;

    nwritten += n;

    if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;

    nwritten += n;

 }

  return nwritten;

}// 所有类型的结构写入

int rdbSaveObject(rio *rdb, robj *o) {

  int n, nwritten = 0;if (o->type == REDIS_STRING) { // 写入字符串

    if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;

    nwritten += n;

 } else if (o->type == REDIS_LIST) { // 写入列表

    if (o->encoding == REDIS_ENCODING_ZIPLIST) { // 压缩列表,由于是数组保存,那么直接保存整个数组即可

      size_t l = ziplistBlobLen((unsigned char*)o->ptr);if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;

      nwritten += n;

   } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { // 链表存储,那么遍历链表并写入

      list *list = o->ptr;

      listIter li;

      listNode *ln;if ((n = rdbSaveLen(rdb,listLength(list))) == -1) return -1;

      nwritten += n;listRewind(list,&li);

      while((ln = listNext(&li))) {

        robj *eleobj = listNodeValue(ln);

        if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;

        nwritten += n;

     }

   } else {

      redisPanic("Unknown list encoding");

   }

 } else if (o->type == REDIS_SET) { // 保存集合

    if (o->encoding == REDIS_ENCODING_HT) { // hash 表存储,那么直接遍历 entry 写入

      dict *set = o->ptr;

      dictIterator *di = dictGetIterator(set);

      dictEntry *de;

      if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;

      nwritten += n;while((de = dictNext(di)) != NULL) {

        robj *eleobj = dictGetKey(de);

        if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;

        nwritten += n;

     }

      dictReleaseIterator(di);

   } else if (o->encoding == REDIS_ENCODING_INTSET) { // intset 存储,那么直接写入整块数组

      size_t l = intsetBlobLen((intset*)o->ptr);if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;

      nwritten += n;

   } else {

      redisPanic("Unknown set encoding");

   }

 } else if (o->type == REDIS_ZSET) { // 排序集合写入

    if (o->encoding == REDIS_ENCODING_ZIPLIST) { // 保存压缩列表的整个数组

      size_t l = ziplistBlobLen((unsigned char*)o->ptr);if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;

      nwritten += n;

   } else if (o->encoding == REDIS_ENCODING_SKIPLIST) { // 保存调表项,遍历并写入entry

      zset *zs = o->ptr;

      dictIterator *di = dictGetIterator(zs->dict);

      dictEntry *de;if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1;

      nwritten += n;while((de = dictNext(di)) != NULL) {

        robj *eleobj = dictGetKey(de);

        double *score = dictGetVal(de);if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;

        nwritten += n;

        if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;

        nwritten += n;

     }

      dictReleaseIterator(di);

   } else { // 不支持的编码类型

      redisPanic("Unknown sorted set encoding");

   }

 } else if (o->type == REDIS_HASH) { // 保存hash表

    if (o->encoding == REDIS_ENCODING_ZIPLIST) { // 保存压缩列表的整个数组

      size_t l = ziplistBlobLen((unsigned char*)o->ptr);if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;

      nwritten += n;} else if (o->encoding == REDIS_ENCODING_HT) { // 保存hash表中的entry

      dictIterator *di = dictGetIterator(o->ptr);

      dictEntry *de;if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;

      nwritten += n;while((de = dictNext(di)) != NULL) {

        robj *key = dictGetKey(de);

        robj *val = dictGetVal(de);if ((n = rdbSaveStringObject(rdb,key)) == -1) return -1;

        nwritten += n;

        if ((n = rdbSaveStringObject(rdb,val)) == -1) return -1;

        nwritten += n;

     }

      dictReleaseIterator(di);

   } else {

      redisPanic("Unknown hash encoding");

   }} else {

    redisPanic("Unknown object type");

 }

  return nwritten;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403