# Redis 深度分析系列五

前面我们详细介绍了ae事件驱动框架的原理:根据操作系统类型,选择合适的多路复用器框架。同时也详细介绍了事件循环执行的流程:aeProcessEvents()函数。本节我们来看看 Redis 基于ae事件循环下的请求响应整体流程的实现。

anetTcpServer 函数

该函数用于初始化服务端套接字,熟悉C语言的网络编程的朋友应该熟悉该流程:

  1. 创建服务端套接字 Socket
  2. 绑定本地地址
  3. 开始监听

本函数亦是按照该流程创建,其中:anetCreateSocket 函数 和 anetListen 函数为通用函数,根据传入参数调用函数库完成socket创建和监听,可以复用Unix套接字与Socket套接字。描述如下。

int anetTcpServer(char *err, int port, char *bindaddr)

{

  int s;

  struct sockaddr_in sa;if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR) // 创建 AF_INET 协议,为 TCP IP 协议

    return ANET_ERR;

  memset(&sa,0,sizeof(sa));

  sa.sin_family = AF_INET;

  sa.sin_port = htons(port); // 设置端口。h为host,n为net,s为short,也即将本机字节序转为网络字节序(注:网络字节序用大端序)

  sa.sin_addr.s_addr = htonl(INADDR_ANY); // 设置监听地址,INADDR_ANY 表示监听所有网卡对应port端口的数据,也即 0.0.0.0

  if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) { // 绑定地址

    anetSetError(err, "invalid bind address");

    close(s);

    return ANET_ERR;

 }

  if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) // 开始监听来自客户端的连接,此时如果产生TCP连接,那么将会进行TCP三次握手,并将握手成功的客户端放入backlog队列

    return ANET_ERR;

  return s;

}// 根据 domain 指定的协议簇创建socketfd

static int anetCreateSocket(char *err, int domain) {

  int s, on = 1;

  if ((s = socket(domain, SOCK_STREAM, 0)) == -1) { // 创建domain协议簇中的流协议,也即TCP协议

    anetSetError(err, "creating socket: %s", strerror(errno));

    return ANET_ERR;

 }

  // 设置socket属性:SOL_SOCKET表示在socket套接字上设置属性,SO_REUSEADDR 表示允许REDIS进程复用绑定地址。用于 redis 基准测试使用

  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {

    anetSetError(err, "setsockopt SO_REUSEADDR: %s", strerror(errno));

    return ANET_ERR;

 }

  return s;

}// 设置接收队列backlog的队列大小,同时允许socket可以接收客户端连接

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {

  if (bind(s,sa,len) == -1) { // 首先绑定地址

    anetSetError(err, "bind: %s", strerror(errno));

    close(s);

    return ANET_ERR;

 }

  // 随后设置backlog队列大小(backlog队列为TCP三次握手成功后放入的队列,用户态可以使用accept系统调用从中获取客户端连接)

  if (listen(s, 511) == -1) {

    anetSetError(err, "listen: %s", strerror(errno));

    close(s);

    return ANET_ERR;

 }

  return ANET_OK;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

anetUnixServer 函数

在Linux中,存在unix socket,称之为unix套接字,该套接字我们可以观察Linux内核源码:将不会进入TCP协议栈来处理数据,它将根据socket的函数指针在内存中构建一个伪服务端和客户端,然后在其中实现socket的功能,由于不走协议栈,所以处理速度很快,所以我们说:unix socket 是利用 socket 来实现的进程之间通讯的一种手段,仅此而已,笔者将会在后面详细分析其中的源码给读者解惑~(很快,下周吧~)。这里我们需要指定:协议簇为AF_LOCAL。源码描述如下。

int anetUnixServer(char *err, char *path, mode_t perm)

{

  int s;

  struct sockaddr_un sa;

  if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR) // 创建server socket

    return ANET_ERR;

  memset(&sa,0,sizeof(sa)); // 初始化sockaddr_un,第二参数指明将其中的值初始化为0

  sa.sun_family = AF_LOCAL;

  strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1); // 将绑定路径设置为sun_path(unix 协议需要通过path来标记server 路径,我们可以通过该路径来找到server,也即抽象了ip和端口设置)

  if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) // 开始监听并设置接收队列大小

    return ANET_ERR;

  if (perm) // 如果设置权限,那么根据perm值,设置path的权限

    chmod(sa.sun_path, perm);

  return s;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

acceptTcpHandler 函数

该函数将在server socket接收到客户端连接时,回调其中首先通过anetTcpAccept函数接收来自客户端的连接,然后调用acceptCommonHandler函数完成客户端socket的处理。源码描述如下。

// 前面描述的initServer()函数代码,将acceptTcpHandler函数注册到打开监听的server socket fd上,当多路复用器发现有客户端连接时,将会在ae处理函数中回调该函数

if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,

                    acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

  int cport, cfd; // 客户端端口和socket fd

  char cip[128]; // 客户端 ip 

  // 为了避免编译器警告无用参数,所以这里将其使用 ((void) V)假装使用了一下

  REDIS_NOTUSED(el);

  REDIS_NOTUSED(mask);

  REDIS_NOTUSED(privdata);

  cfd = anetTcpAccept(server.neterr, fd, cip, &cport); // 接收客户端连接

  if (cfd == AE_ERR) {

    redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);

    return;

 }

  redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);

  acceptCommonHandler(cfd); // 处理客户端连接

}// 调用 anetGenericAccept 函数完成接收

int anetTcpAccept(char *err, int s, char *ip, int *port) {

  int fd;

  struct sockaddr_in sa;

  socklen_t salen = sizeof(sa);

  if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)

    return ANET_ERR;

  // 复制ip和port(这里需要处理字节序~)

  if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));

  if (port) *port = ntohs(sa.sin_port);

  return fd;

}// 循环调用系统调用accept接收客户端连接,这里唯一的继续条件:EINTR(Interrupted system call ),表示系统调用被中断,此时可以继续重试获取客户端连接

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {

  int fd;

  while(1) {

    fd = accept(s,sa,len); // 成功读取后,由Linux 内核完成 sockaddr *sa 设置 

    if (fd == -1) {

      if (errno == EINTR)

        continue;

      else {

        anetSetError(err, "accept: %s", strerror(errno));

        return ANET_ERR;

     }

   }

    break;

 }

  return fd;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

acceptUnixHandler 函数

该函数顾名思义,用于接收通过unix socket通讯的在同一台机器上的进程通讯。源码描述如下。

void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

  int cfd;

  REDIS_NOTUSED(el);

  REDIS_NOTUSED(mask);

  REDIS_NOTUSED(privdata);

  cfd = anetUnixAccept(server.neterr, fd);

  if (cfd == AE_ERR) {

    redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);

    return;

 }

  redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);

  acceptCommonHandler(cfd);

}// 描述同acceptTcpHandler函数,除了不对port和ip处理,因为它没有~

int anetUnixAccept(char *err, int s) {

  int fd;

  struct sockaddr_un sa;

  socklen_t salen = sizeof(sa);

  if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)

    return ANET_ERR;return fd;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

acceptCommonHandler 函数

该函数用于根据接收到的客户端socket创建描述客户端的元数据结构:redisClient。源码描述如下。

static void acceptCommonHandler(int fd) {

  redisClient *c;

  // 创建描述客户端的redis client结构

  if ((c = createClient(fd)) == NULL) {

    redisLog(REDIS_WARNING,"Error allocating resoures for the client");

    close(fd); 

    return;

 }

  // 若当前redis 客户端连接达到最大,那么向客户端写入错误信息

  if (listLength(server.clients) > server.maxclients) {

    char *err = "-ERR max number of clients reached\r\n";

    if (write(c->fd,err,strlen(err)) == -1) { // 注意:这里由于客户端是非阻塞调用,有可能由于客户端写缓冲区已经满了,写入失败,但是无所谓,这里只需要尽力发送即可 

   }

    server.stat_rejected_conn++; // 记录拒绝处理的客户端数量 

    freeClient(c); // 释放客户端元数据结构

    return;

 }

  server.stat_numconnections++; // 记录完成连接数

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

createClient 函数

redisClient *createClient(int fd) {

  redisClient *c = zmalloc(sizeof(redisClient)); // 分配客户端结构内存

  // fd 为 -1时,表示在其他上下文中执行,比如:lua 脚本。fd 不为 -1,那么表明为一个有效的远程客户端连接,那么需要将其设置为非阻塞模式,同时将其注册读事件,当socket的读缓冲区中存在数据时,那么将会在ae事件循环过程中回调readQueryFromClient函数完成业务处理

  if (fd != -1) { 

    anetNonBlock(NULL,fd);

    anetTcpNoDelay(NULL,fd);

    if (aeCreateFileEvent(server.el,fd,AE_READABLE,

               readQueryFromClient, c) == AE_ERR) // 将其注册到多路复用器中

   {

      close(fd);

      zfree(c);

      return NULL;

   }

 }

  // 初始化 redis client 结构的初始值,具体值,将会在ae检测到客户端发送的数据时,回调readQueryFromClient函数完成设置

  selectDb(c,0);

  c->fd = fd;

  c->bufpos = 0;

  c->querybuf = sdsempty();

  c->querybuf_peak = 0;

  c->reqtype = 0;

  c->argc = 0;

  c->argv = NULL;

  c->cmd = c->lastcmd = NULL;

  c->multibulklen = 0;

  c->bulklen = -1;

  c->sentlen = 0;

  c->flags = 0;

  c->ctime = c->lastinteraction = server.unixtime;

  c->authenticated = 0;

  c->replstate = REDIS_REPL_NONE;

  c->slave_listening_port = 0;

  c->reply = listCreate();

  c->reply_bytes = 0;

  c->obuf_soft_limit_reached_time = 0;

  listSetFreeMethod(c->reply,decrRefCount);

  listSetDupMethod(c->reply,dupClientReplyValue);

  c->bpop.keys = NULL;

  c->bpop.count = 0;

  c->bpop.timeout = 0;

  c->bpop.target = NULL;

  c->io_keys = listCreate();

  c->watched_keys = listCreate();

  listSetFreeMethod(c->io_keys,decrRefCount);

  c->pubsub_channels = dictCreate(&setDictType,NULL);

  c->pubsub_patterns = listCreate();

  listSetFreeMethod(c->pubsub_patterns,decrRefCount);

  listSetMatchMethod(c->pubsub_patterns,listMatchObjects);

  if (fd != -1) listAddNodeTail(server.clients,c); // 将客户端添加到客户端列表末尾

  initClientMultiState(c);

  return c;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

readQueryFromClient 函数

该函数将在客户端发送数据到socket读缓冲区时,由多路复用器检测到,然后ae事件循环调用该函数完成具体业务处理。可以看到这里将会把客户端数据读入querybuf,然后调用processInputBuffer函数完成命令的处理。源码描述如下。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

  redisClient *c = (redisClient*) privdata;

  int nread, readlen;

  size_t qblen;

  REDIS_NOTUSED(el);

  REDIS_NOTUSED(mask);

​

  server.current_client = c;

  readlen = REDIS_IOBUF_LEN;

  if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1

    && c->bulklen >= REDIS_MBULK_BIG_ARG)

 {

    int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);if (remaining < readlen) readlen = remaining;

 }

  qblen = sdslen(c->querybuf); 

  // 准备查询缓冲区,并读取客户端数据

  if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

  c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

  nread = read(fd, c->querybuf+qblen, readlen);

  if (nread == -1) { // 读取失败

    if (errno == EAGAIN) {

      nread = 0;

   } else {

      redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));

      freeClient(c);

      return;

   }

 } else if (nread == 0) { // 客户端关闭了连接

    redisLog(REDIS_VERBOSE, "Client closed connection");

    freeClient(c);

    return;

 }

  if (nread) { // 读取到数据后,增加SDS的长度

    sdsIncrLen(c->querybuf,nread);

    c->lastinteraction = server.unixtime;

 } else {

    server.current_client = NULL;

    return;

 }

  if (sdslen(c->querybuf) > server.client_max_querybuf_len) { // 查询缓冲区超过设置的最大值,此时为非法状态,那么释放占用内存,并打印日志

    sds ci = getClientInfoString(c), bytes = sdsempty();

​

    bytes = sdscatrepr(bytes,c->querybuf,64);

    redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);

    sdsfree(ci);

    sdsfree(bytes);

    freeClient(c);

    return;

 }

  processInputBuffer(c); // 处理客户端数据

  server.current_client = NULL;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

processInputBuffer 函数

该函数将会处理客户端传递过来的命令。源码描述如下。

void processInputBuffer(redisClient *c) {

  while(sdslen(c->querybuf)) {

    if (c->flags & REDIS_BLOCKED) return; // 如果客户端正在处理其他事情,则立即中止处理,表示处理阻塞

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; // 表示在向客户端写入响应信息后将立即关闭连接,设置了这个标志后不处理客户端的剩余命令

    if (!c->reqtype) { // 不确定的请求类型,那么根据第一个查询缓冲区是否为 * 设置请求类型(REDIS_REQ_INLINE 单个请求,REDIS_REQ_MULTIBULK 多个请求)

      if (c->querybuf[0] == '*') {

        c->reqtype = REDIS_REQ_MULTIBULK;

     } else {

        c->reqtype = REDIS_REQ_INLINE;

     }

   }

    // 处理查询缓冲区数据(也即切割缓冲区中的数据,生成:argc和argv)

    if (c->reqtype == REDIS_REQ_INLINE) { 

      if (processInlineBuffer(c) != REDIS_OK) break;

   } else if (c->reqtype == REDIS_REQ_MULTIBULK) {

      if (processMultibulkBuffer(c) != REDIS_OK) break;

   } else {

      redisPanic("Unknown request type");

   }

    if (c->argc == 0) { // 不存在参数,那么重置客户端(清理内存,并重新等待下一次发送的命令)

      resetClient(c);

   } else {

      if (processCommand(c) == REDIS_OK) // 调用相应命令处理客户端请求

        resetClient(c);

   }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

resetClient 函数

该函数重置部分redisClient结构变量值,同时释放argv占用的内存,准备下一次客户端发送命令并处理。源码如下。

void resetClient(redisClient *c) {

  freeClientArgv(c);

  c->reqtype = 0;

  c->multibulklen = 0;

  c->bulklen = -1;

  if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);

}
1
2
3
4
5
6
7
8
9
10
11
12
13

processCommand 函数

该函数被调用时,那么redis已经读取客户端发送的整个命令,参数在客户端 argv 与argc 变量中。该函数将执行客户端发送的命令。如果该返回1,则客户端仍然保持连接并且有效,调用方可以执行其他操作,如果该函数返回0,客户端应该被销毁。源码描述如下。

int processCommand(redisClient *c) {

  if (!strcasecmp(c->argv[0]->ptr,"quit")) { // 退出连接命令

    addReply(c,shared.ok);

    c->flags |= REDIS_CLOSE_AFTER_REPLY;

    return REDIS_ERR;

 }

  // 查找客户端请求执行的命令,若命令不存在,那么将错误信息发送给客户端

  c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

  if (!c->cmd) {

    addReplyErrorFormat(c,"unknown command '%s'",

             (char*)c->argv[0]->ptr);

    return REDIS_OK;

 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||

       (c->argc < -c->cmd->arity)) { // 检测命令格式

    addReplyErrorFormat(c,"wrong number of arguments for '%s' command",

              c->cmd->name);

    return REDIS_OK;

 }// 检测客户端权限

  if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)

 {

    addReplyError(c,"operation not permitted");

    return REDIS_OK;

 }// redis 达到最大占用内存,那么将oom信息发送给客户都安

  if (server.maxmemory) {

    int retval = freeMemoryIfNeeded();

    if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {

      addReply(c, shared.oomerr);

      return REDIS_OK;

   }

 }// 如果磁盘存在问题,不要接受任何写命令

  if (server.stop_writes_on_bgsave_err &&

    server.saveparamslen > 0

    && server.lastbgsave_status == REDIS_ERR &&

    c->cmd->flags & REDIS_CMD_WRITE)

 {

    addReply(c, shared.bgsaveerr);

    return REDIS_OK;

 }// 哨兵模式的读模式,也不接受任何写命令

  if (server.masterhost && server.repl_slave_ro &&

    !(c->flags & REDIS_MASTER) &&

    c->cmd->flags & REDIS_CMD_WRITE)

 {

    addReply(c, shared.roslaveerr);

    return REDIS_OK;

 }// 只允许在发布/订阅上下文中订阅和取消订阅

  if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)

    && // 命令执行函数必须是订阅相关命令

    c->cmd->proc != subscribeCommand && 

    c->cmd->proc != unsubscribeCommand &&

    c->cmd->proc != psubscribeCommand &&

    c->cmd->proc != punsubscribeCommand) {

    addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");

    return REDIS_OK;

 }

  // 只有当slave-server-stale-data配置为 no 时,并且我们是一个与主库断开的slave时,才允许执行INFO和SLAVEOF 命令

  if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&

    server.repl_serve_stale_data == 0 &&

    !(c->cmd->flags & REDIS_CMD_STALE))

 {

    addReply(c, shared.masterdownerr);

    return REDIS_OK;

 }// 服务器正在加载数据,不接受客户端请求

  if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {

    addReply(c, shared.loadingerr);

    return REDIS_OK;

 }// lua脚本执行过慢,只允许执行带有REDIS_CMD_STALE标志的命令

  if (server.lua_timedout &&

    c->cmd->proc != authCommand &&

    !(c->cmd->proc == shutdownCommand &&

     c->argc == 2 &&

     tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&

    !(c->cmd->proc == scriptCommand &&

     c->argc == 2 &&

     tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))

 {

    addReply(c, shared.slowscripterr);

    return REDIS_OK;

 }// 检测无误,那么执行命令

  if (c->flags & REDIS_MULTI &&

    c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&

    c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)

 { // 执行 MULTI 命令,也即多操作命令

    queueMultiCommand(c);

    addReply(c,shared.queued);

 } else { // 执行单命令

    call(c,REDIS_CALL_FULL);

    if (listLength(server.ready_keys))

      handleClientsBlockedOnLists();

 }

  return REDIS_OK;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213

lookupCommand 函数

该函数用于从server.commands中获取到name指定的函数指针,并将其命令返回,而server.commands在server初始化时由redisCommandTable数组中定义。源码如下。

// 填充命令列表放入server.commands中

struct redisCommand redisCommandTable[] = {...};

void populateCommandTable(void) {

  for (j = 0; j < numcommands; j++) {

    struct redisCommand *c = redisCommandTable+j;

   ...

    retval = dictAdd(server.commands, sdsnew(c->name), c);

 }

}struct redisCommand *lookupCommand(sds name) {

  return dictFetchValue(server.commands, name);

}void *dictFetchValue(dict *d, const void *key) {

  dictEntry *he;

​

  he = dictFind(d,key);

  return he ? dictGetVal(he) : NULL;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

queueMultiCommand 函数

该函数用于将multi命令添加到multi队列中。源码如下。

void queueMultiCommand(redisClient *c) {

  multiCmd *mc;

  int j;

  // 增加commands队列大小,并将redisClient 数据放入新增的一项multiCmd中

  c->mstate.commands = zrealloc(c->mstate.commands,

                 sizeof(multiCmd)*(c->mstate.count+1));

  mc = c->mstate.commands+c->mstate.count; 

  mc->cmd = c->cmd;

  mc->argc = c->argc;

  // 将redisClient的argv复制到mc->argv中,因为multi命令执行与客户端redisClient结构再无关系

  mc->argv = zmalloc(sizeof(robj*)*c->argc);

  memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);

  for (j = 0; j < c->argc; j++)

    incrRefCount(mc->argv[j]);

  c->mstate.count++;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

call 函数

该函数将调用c->cmd->proc(c)设置的函数完成客户端命令执行。源码描述如下。

void call(redisClient *c, int flags) {

  long long dirty, start = ustime(), duration;

 ...

  // 执行命令

  redisOpArrayInit(&server.also_propagate);

  dirty = server.dirty; // 保存服务器为脏状态,也即是否修改了内存数据

  c->cmd->proc(c);

  dirty = server.dirty-dirty; // 看看执行完命令后,是否修改了内存数据

  duration = ustime()-start; // 记录命令执行时间// 当调用EVAL加载AOF时,我们不希望从Lua调用的命令影响慢日志或填充统计信息

  if (server.loading && c->flags & REDIS_LUA_CLIENT)

    flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);// 记录慢操作日志

  if (flags & REDIS_CALL_SLOWLOG)

    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);

  if (flags & REDIS_CALL_STATS) {

    c->cmd->microseconds += duration;

    c->cmd->calls++;

 }

  // 将命令复制到aof和slave复制队列

  if (flags & REDIS_CALL_PROPAGATE) {

    int flags = REDIS_PROPAGATE_NONE;

    if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)

      flags |= REDIS_PROPAGATE_REPL;

    if (dirty)

      flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);

    if (flags != REDIS_PROPAGATE_NONE)

      propagate(c->cmd,c->db->id,c->argv,c->argc,flags);

 }

  // 诸如 LPUSH 或 BRPOPLPUSH 之类的命令可能传递额外的push命令

  if (server.also_propagate.numops) {

    int j;

    redisOp *rop;

    for (j = 0; j < server.also_propagate.numops; j++) {

      rop = &server.also_propagate.ops[j];

      propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);

   }

    redisOpArrayFree(&server.also_propagate);

 }

  server.stat_numcommands++;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

addReply 函数

该函数用于将 robj *obj 的redis 对象写入客户端。这里我们先尝试写入缓冲区,失败后将robj放入reply列表,将会在sendReplyToClient函数中处理。源码如下。

void addReply(redisClient *c, robj *obj) {

  if (prepareClientToWrite(c) != REDIS_OK) return; 

  if (obj->encoding == REDIS_ENCODING_RAW) { // RAW 原始编码,那么将数据写入客户端的redis层面的写缓冲区中( char buf[REDIS_REPLY_CHUNK_BYTES])若写入失败,那么将其写入list *reply列表

    if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)

      _addReplyObjectToList(c,obj);

 } else if (obj->encoding == REDIS_ENCODING_INT) { // INT 整形编码,那么将其转为字符串后,写入缓冲区,同样写入失败后,将其放入list *reply列表

    if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) { // reply列表为空,同时缓冲区中存在空间可以写入32个字符(这里32个字符为优化操作,如果缓冲区中有32个字节的空间(超过64位的整数可以用作字符串的最大字符数),这就避免解码对象,读者可以自行查看ll2string的代码)

      char buf[32];

      int len;

      len = ll2string(buf,sizeof(buf),(long)obj->ptr);

      if (_addReplyToBuffer(c,buf,len) == REDIS_OK) // 由于预判了存在足够大的缓冲区,所以直接写入即可

        return;

   }

    obj = getDecodedObject(obj); // 否则解码对象,然后将其再次尝试添加到缓冲区,失败后将其添加到reply列表

    if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)

      _addReplyObjectToList(c,obj);

    decrRefCount(obj); // obj写入成功,那么释放一个引用次数

 } else {

    redisPanic("Wrong obj->encoding in addReply()");

 }

}// 检测客户端状态,并将客户端socket fd注册到多路复用器中,写函数为sendReplyToClient,当客户端fd的写缓冲区空闲时将会调用sendReplyToClient完成写入

int prepareClientToWrite(redisClient *c) {

  if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;

  if (c->fd <= 0) return REDIS_ERR; // 虚拟客户端,比如LUA 脚本执行

  if (c->bufpos == 0 && listLength(c->reply) == 0 &&

   (c->replstate == REDIS_REPL_NONE ||

    c->replstate == REDIS_REPL_ONLINE) &&

    aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,

             sendReplyToClient, c) == AE_ERR) return REDIS_ERR;

  return REDIS_OK;

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

sendReplyToClient 函数

该函数在ae时间循环中调用,当socket fd 写缓冲区空闲时调用,将redis的写缓冲区的数据和reply链表的数据写出(读者注意:这里需要写入的数据为redis写缓冲区和reply链表,而又由于fd的内核写缓冲区有限,所以有可能一次写入不成功,那么可以在下一次ae发现写缓冲区不为满状态时,再次由ae事件循环完成写出)。源码描述如下。

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {

  redisClient *c = privdata;

  int nwritten = 0, totwritten = 0, objlen;

  size_t objmem;

  robj *o;

  REDIS_NOTUSED(el);

  REDIS_NOTUSED(mask);

  // 存在数据需要写出

  while(c->bufpos > 0 || listLength(c->reply)) {

    if (c->bufpos > 0) {

      if (c->flags & REDIS_MASTER) { // 若客户端时主库,那么不能写出,主库必须回应

        nwritten = c->bufpos - c->sentlen;

     } else { // 开始写出客户都安

        nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);

        if (nwritten <= 0) break; // fd写缓冲区满,那么结束写入

     }

      c->sentlen += nwritten;

      totwritten += nwritten;

      if (c->sentlen == c->bufpos) { // 客户端redis写缓冲区中的数据已经完全写出

        c->bufpos = 0;

        c->sentlen = 0;

     }

   } else { // 处理reply中的数据,将其尝试写出

      o = listNodeValue(listFirst(c->reply));

      objlen = sdslen(o->ptr);

      objmem = zmalloc_size_sds(o->ptr);

      if (objlen == 0) { // 当前robj不存在数据尝试链表中下一个

        listDelNode(c->reply,listFirst(c->reply));

        continue;

     }

      if (c->flags & REDIS_MASTER) {

 

        nwritten = objlen - c->sentlen;

     } else { // 开始写出

        nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);

        if (nwritten <= 0) break;

     }

      c->sentlen += nwritten;

      totwritten += nwritten;

      // 完全写出第一个robj,那么继续写出下一个

      if (c->sentlen == objlen) {

        listDelNode(c->reply,listFirst(c->reply));

        c->sentlen = 0;

        c->reply_bytes -= objmem;

     }

   }

    // 避免一次写入过多数据

    if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&

     (server.maxmemory == 0 ||

      zmalloc_used_memory() < server.maxmemory)) break;

 }

  if (nwritten == -1) { // 写入失败,那么打印日志

    if (errno == EAGAIN) { // EAGAIN表示重试,那么不需要报警,下一次fd写缓冲区可用时再次写入即可

      nwritten = 0;

   } else {

      redisLog(REDIS_VERBOSE,

          "Error writing to client: %s", strerror(errno));

      freeClient(c);

      return;

   }

 }

  // 成功写出数据,那么记录最后一次写出数据时间

  if (totwritten > 0) c->lastinteraction = server.unixtime;

  if (c->bufpos == 0 && listLength(c->reply) == 0) { // 全部数据写出完成(写缓冲区和reply链表),那么将器从多路复用器中解除写事件注册,因为不需要再写入其他事件

    c->sentlen = 0;

    aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);

 }

}



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140