# Redis IO多线程原理

最近看到一堆鼓吹Redis 6.0 IO 多线程的高性能,所以提前分析下它的原理

众所周知Redis的事件循环模型并没有使用libevent、libev、libuv等库,而是自己实现了一套称之为ae的事件循环模型,而在最新的redis中将原来的单线程模型改进为了多线程处理IO操作的线程,也即单线程接收客户端连接,多个IO线程同时处理这些客户端连接的Read、Write操作实现并行写增加了性能。但同时很多朋友并不知道其中的处理原理,并且经过笔者的阅览,对比与Netty和Tomcat的线程模型,笔者认为该模型有待优化。这篇文章我们着重讨论以下几个内容:

  1. ae主线程轮询事件原理
  2. 服务端监听并处理客户端连接原理
  3. IO多线程处理的过程
  4. 多线程处理的改进

注:本文采用最新的redis 6.0的源码分析。

Redis ae主线程处理事件原理

我们先来看在server.c文件的main函数中执行的过程。我们看到由主线程调用aeMain函数进行轮询处理事件,我们这里省略了对于事件循环的初始化过程,把关注点放在事件处理上。而aeMain函数中使用了一个循环调用aeProcessEvents函数来处理eventLoop中准备好的事件,同时使用AE_ALL_EVENTS标志位标识处理所有事件,AE_CALL_BEFORE_SLEEP和AE_CALL_AFTER_SLEEP标志位标识在执行aeApiPoll函数的前后调用回调函数:beforesleep和aftersleep。详细实现如下。

int main(int argc, char **argv) {

 ...

  aeMain(server.el);

 ...

}void aeMain(aeEventLoop *eventLoop) {

  eventLoop->stop = 0;

  // 循环处理事件,直到显示指定停止

  while (!eventLoop->stop) {

    aeProcessEvents(eventLoop, AE_ALL_EVENTS|

            AE_CALL_BEFORE_SLEEP|

            AE_CALL_AFTER_SLEEP);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

接下来我们来看aeProcessEvents函数的实现过程。我们看到首先判断eventLoop->maxfd和标志位,maxfd保存了当前注册到事件循环中的最大文件描述符,当该描述符存在的情况下,肯定有事件被注册了,那么可以直接执行,同时我们看到aeApiPoll函数调用前后回调了beforesleep和aftersleep函数,因为aeApiPoll函数底层可能是poll、select、epoll等函数,根据tvp来设置睡眠时间,所以这里的回调函数名为睡眠前后回调函数,该函数会把准备好的事件放入eventLoop->fired数组中,我们通过循环获取该数组中的aeFileEvent对象,根据准备好的事件类型来调用不同的函数,也即如果是读那么调用rfileProc,如果是写那么调用wfileProc函数。详细实现如下,其中笔者省略掉了一些变量的定义保留了核心操作。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

 ...

  // 处理事件

  if (eventLoop->maxfd != -1 ||

   ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {

   ...

    // 调用睡眠前回调函数,因为aeApiPoll函数底层可能是poll、select、epoll等函数,根据tvp来设置睡眠时间

    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)

      eventLoop->beforesleep(eventLoop);

    // 获取所有准备好的事件,结果存放在aeEventLoop结构中

    numevents = aeApiPoll(eventLoop, tvp);

    // 获取事件完成后回调aftersleep回调函数

    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

      eventLoop->aftersleep(eventLoop);

    // 循环处理所有事件

    for (j = 0; j < numevents; j++) {

      // aeEventLoop结构中的fired数组保存了所有准备好的事件

      aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

     ...

      // 处理读事件

      if (!invert && fe->mask & mask & AE_READABLE) {

        fe->rfileProc(eventLoop,fd,fe->clientData,mask);

        fired++;

        fe = &eventLoop->events[fd]; /* Refresh in case of resize. */

     }

      // 处理写事件

      if (fe->mask & mask & AE_WRITABLE) {

        if (!fired || fe->wfileProc != fe->rfileProc) {

          fe->wfileProc(eventLoop,fd,fe->clientData,mask);

          fired++;

       }

     }

     ...

      processed++;

   }

 }

  // 执行时间事件

  if (flags & AE_TIME_EVENTS)

    processed += processTimeEvents(eventLoop);

  return processed; /* return the number of processed file/time events */

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

接下来我们来看aeApiPoll函数,该函数用于从内核中获取准备好的事件对象,并将其放入eventLoop->fired数组中,我们这里选取Linux平台下epoll函数的实现过程。我们看到首先调用epoll_wait函数从内核中获取准备好的事件,然后遍历这些事件,将epoll层面的标志位EPOLLIN(读事件)、EPOLLOUT(写事件)转变为Redis层面的标志位AE_READABLE(读事件)、AE_WRITABLE(写事件),随后将eventLoop->fired数组中相应的aeFileEvent结构位的fd和mask初始化。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {

  // 获取api状态结构

  aeApiState *state = eventLoop->apidata;

  int retval, numevents = 0;

  // 从内核中获取准备好的事件对象,该对象保存在state->events数组中,tvp指明了睡眠时间

  retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,

            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

  // 当前有准备好的事件,retval变量保存了准备好的事件数

  if (retval > 0) {

    int j;

    numevents = retval;

    // 遍历所有的事件

    for (j = 0; j < numevents; j++) {

      int mask = 0;

      // 这里我们采用C语言中对指针的加法来完成数组的遍历

      struct epoll_event *e = state->events+j;

      // 通过事件类型来设置在redis层面使用的事件类型

      if (e->events & EPOLLIN) mask |= AE_READABLE;

      if (e->events & EPOLLOUT) mask |= AE_WRITABLE;

      if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;

      if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;

      // 将fired数组中相应位置的aeFileEvent结构的fd和mask初始化

      eventLoop->fired[j].fd = e->data.fd;

      eventLoop->fired[j].mask = mask;

   }

 }

  return numevents;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

Redis Server 套接字初始化过程

本小节我们来看看server端用于监听客户端套接字的事件,我们看到同样在main函数中调用initServer函数初始化redis服务端,在该函数中调用aeCreateFileEvent函数将监听客户端连接的fd注册到循环组中。实现原理如下。

int main(int argc, char **argv) {

 ...

  // 初始化redis服务器

  initServer();

 ...

}void initServer(void) {

 ...

  // 循环初始化ae事件循环监听客户端的端口,通常我们这里只监听一个端口

  for (j = 0; j < server.ipfd_count; j++) {

    // 向ae事件循环组中注册该fd事件为AE_READABLE读事件,同时指定回调函数为acceptTcpHandler

    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,

      acceptTcpHandler,NULL) == AE_ERR)

     {

        serverPanic(

          "Unrecoverable error creating server.ipfd file event.");

     }

 }

 ...

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

接下来我们来看aeCreateFileEvent函数的实现原理。我们看到fd保存了eventLoop->events的下标,我们首先调用aeApiAddEvent函数将该fd和它感兴趣的事件集注册到事件循环中,随后根据读写事件集注册aeFileEvent结构的读写回调事件,我们这里以read和accept操作作为例子,所以我们这里设置的是acceptTcpHandler函数。详细实现如下。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,

           aeFileProc *proc, void *clientData)

{

  // 首先检测fd索引不能超过最大数组下标

  if (fd >= eventLoop->setsize) {

    errno = ERANGE;

    return AE_ERR;

 }

  // 获取下标对应的aeFileEvent事件结构体

  aeFileEvent *fe = &eventLoop->events[fd];

  // 将其添加到事件循环中,mask指明了感兴趣的事件集

  if (aeApiAddEvent(eventLoop, fd, mask) == -1)

    return AE_ERR;

  // 保存事件集并绑定事件执行函数

  fe->mask |= mask;

  if (mask & AE_READABLE) fe->rfileProc = proc;

  if (mask & AE_WRITABLE) fe->wfileProc = proc;

  fe->clientData = clientData;

  // 如果当前下标超过最大下标,那么当前下边为事件循环所持有的最大下标

  if (fd > eventLoop->maxfd)

    eventLoop->maxfd = fd;

  return AE_OK;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

最后我们来看aeApiAddEvent函数的实现过程。我们这里同样以epoll函数为例,我们看到首先根据之前fd是否存在于epoll中来选择op为EPOLL_CTL_ADD新增操作还是EPOLL_CTL_MOD修改操作,然后将redis的事件集转变为epoll的事件集,随后调用epoll_ctl将该fd及其事件集放入内核中。详细实现如下。

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {

  // 获取ae状态结构体

  aeApiState *state = eventLoop->apidata;

  struct epoll_event ee = {0}; 

  // 根据该fd的状态来选择是添加还是修改,如果之前已经注册过该fd,那么events[fd].mask不为AE_NONE,那么将是修改操作所以op为EPOLL_CTL_MOD,否则为EPOLL_CTL_ADD

  int op = eventLoop->events[fd].mask == AE_NONE ?

    EPOLL_CTL_ADD : EPOLL_CTL_MOD;

  ee.events = 0;

  // 混合之前注册的事件集,如果是新增,那么这里只有mask

  mask |= eventLoop->events[fd].mask;

  // 将Redis的事件集转换为Epoll使用的事件集

  if (mask & AE_READABLE) ee.events |= EPOLLIN; // 读事件

  if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 写事件

  ee.data.fd = fd;

  // 将该fd注册到epoll中

  if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

  return 0;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

Redis Server 处理客户端连接过程

我们知道当ae事件循环从事件循环中获取到准备好的事件,那么将会根据事件集类型回调其操作,我们在上一节看到服务端fd注册了读事件,同时设置了回调事件为acceptTcpHandler函数,Java Coder注意下,这里不像Java的Selector将事件细分为OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ事件,对于epoll而言它只知道读写事件,而我们这里将回调函数设置为不同的函数从而实现了不同事件的处理过程。这一节我们来看看当redis接收到客户端连接后如何处理。

我们先来看acceptTcpHandler函数处理,当客户端连接后将会回调该函数,我们看到循环调用anetTcpAccep处理客户端连接,同时为了保证一次处理的连接数,这里使用了最大值MAX_ACCEPTS_PER_CALL来限制一次接受的客户端连接,该值默认为1000。接收到连接后随后调用connCreateAcceptedSocket创建connection结构体,随后调用acceptCommonHandler函数处理该连接。详细实现如下

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

  int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

  char cip[NET_IP_STR_LEN];

  // 限制每次处理的客户端连接最大值为MAX_ACCEPTS_PER_CALL 1000

  while(max--) {

    // 接受客户端连接接

    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

    // 如果发生错误那么返回

    if (cfd == ANET_ERR) {

      return;

   }

    // 处理客户端连接

    acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

接下来我们来看anetTcpAccept函数如何处理客户端连接。我们看到该函数调用anetGenericAccept来接收客户端连接,在anetGenericAccept函数中通过调用accept函数接收客户端连接,由于我们将该fd设置为了非阻塞,所以如果没有客户端连接,那么将会返回-1。详细实现如下。

int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {

 ...

  if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)

    return ANET_ERR;

 ...

  return fd;

}static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {

  int fd;

  while(1) {

    // 调用该函数获取客户端连接

    fd = accept(s,sa,len);

    if (fd == -1) {

      if (errno == EINTR)

        continue;

      else {

        anetSetError(err, "accept: %s", strerror(errno));

        return ANET_ERR;

     }

   }

    break;

 }

  return fd;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

接下来我们来看connCreateAcceptedSocket函数如何根据客户端连接fd创建connection结构体,我们看到首先调用connCreateSocket函数创建connection结构,该函数首先分配connection结构的内存空间,随后设置type字段为&CT_Socket,该结构中定义了大量回调函数,将会在后面处理客户端连接时调用,随后保存当前客户端fd,同时设置状态为CONN_STATE_ACCEPTING表示正在处理中。详细实现如下。

connection *connCreateAcceptedSocket(int fd) {

  // 创建connection结构

  connection *conn = connCreateSocket();

  // 保存fd并将connection结构的状态设置为CONN_STATE_ACCEPTING,表示正在处理中

  conn->fd = fd;

  conn->state = CONN_STATE_ACCEPTING;

  return conn;

}

​

connection *connCreateSocket() {

  // 分配结构体内存

  connection *conn = zcalloc(sizeof(connection));

  // 设置类型为CT_Socket,该CT_Socket中定义了回调函数

  conn->type = &CT_Socket;

  // 初始化fd为-1

  conn->fd = -1;

  return conn;

}// 回调函数定义

ConnectionType CT_Socket = {

 .ae_handler = connSocketEventHandler,

 .close = connSocketClose,

 .write = connSocketWrite,

 .read = connSocketRead,

 .accept = connSocketAccept,

 .connect = connSocketConnect,

 .set_write_handler = connSocketSetWriteHandler,

 .set_read_handler = connSocketSetReadHandler,

 .get_last_error = connSocketGetLastError,

 .blocking_connect = connSocketBlockingConnect,

 .sync_write = connSocketSyncWrite,

 .sync_read = connSocketSyncRead,

 .sync_readline = connSocketSyncReadLine,

 .get_type = connSocketGetType

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

最后我们来看acceptCommonHandler函数如何处理connCreateAcceptedSocket函数中创建的处于CONN_STATE_ACCEPTING状态的connection结构体。我们看到首先调用createClient函数创建客户端连接结构体client。详细实现如下。

static void acceptCommonHandler(connection *conn, int flags, char *ip) {

 ...

  // 创建客户端连接结构体client

  if ((c = createClient(conn)) == NULL) {

   ...

    return;

 }

 ...

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

我们来看createClient函数创建客户端连接的核心步骤。我们看到首先分配客户端结构client的内存,随后配置客户端fd为非阻塞式IO,配置TcpNoDelay,设置保活配置,随后设置客户端连接处理器为readQueryFromClient,最后将客户端连接到全局客户端列表。详细实现如下,当然笔者这里省略了对结构体client的其他属性的初始化过程。

client *createClient(connection *conn) {

  // 分配客户端结构内存

  client *c = zmalloc(sizeof(client));

  if (conn) {

    // 配置非阻塞式IO

    connNonBlock(conn);

    // 配置TcpNoDelay

    connEnableTcpNoDelay(conn);

    // 设置保活配置

    if (server.tcpkeepalive)

      connKeepAlive(conn,server.tcpkeepalive);

    // 设置客户端连接处理器为readQueryFromClient

    connSetReadHandler(conn, readQueryFromClient);

    // 将client结构设置为connection结构的私有数据PrivateData

    connSetPrivateData(conn, c);

 }

 ...

  // 将客户端连接到全局客户端列表

  if (conn) linkClient(c);

 ...

  return c;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

我们知道客户端对象接收到了连接,也需要将其放入ae事件循环中,我们通过这些函数调用并没有看到这个操作,那么是哪里的函数调用没有看到呢?答案是必须的,不然怎么接收客户端的读请求,我们来看connSetReadHandler(conn, readQueryFromClient);这个函数的实现原理。可以看到该函数直接调用我们之前看到的CT_Socket结构体定义的set_read_handler函数。而该函数将readQueryFromClient函数设置为连接的read_handler,我们在前面聊ae循环处理事件原理时,了解到当该客户端连接可读时将会调用该函数,同时调用aeCreateFileEvent将该客户端fd添加到事件循环中,并将事件集设置为AE_READABLE。这时当客户端数据到来时将由ae事件循环来调用readQueryFromClient函数来处理。详细源码如下。

static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {

  return conn->type->set_read_handler(conn, func);

}// CT_Socket结构体定义

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {

  if (func == conn->read_handler) return C_OK;

​

  conn->read_handler = func;

  if (!conn->read_handler)

    aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);

  else

    if (aeCreateFileEvent(server.el,conn->fd,

          AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;

  return C_OK;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

最后我们来看readQueryFromClient函数如何处理客户端的数据,我们看到首先调用postponeClientRead函数尝试将客户端连接放入到read pending队列中,随后由IO 线程来并行处理,如果调用成功,直接返回,否则我们调用connRead函数读取客户端命令,然后调用processInputBuffer函数处理该命令。详细源码如下。

void readQueryFromClient(connection *conn) {

 ...

  // 将客户端连接放入到read pending队列中,随后由IO 线程来并行处理

  if (postponeClientRead(c)) return;

 ...

  // 读取客户端数据

  nread = connRead(c->conn, c->querybuf+qblen, readlen);

 ...

  // 处理客户端数据

  processInputBuffer(c);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

我们来看postponeClientRead如何将客户端放入到read pending队列。我们看到判断条件为五个,读者这里特别关注CLIENT_PENDING_READ这个标识符,该标识符用于表示该客户端不会再次放入clients_pending_read队列。详细源码及解释如下。

int postponeClientRead(client *c) {

  if (server.io_threads_active && // IO线程激活

    server.io_threads_do_reads && // 配置从IO线程读取命令

    !clientsArePaused() && // redis服务没有设置客户端暂停处理

    !ProcessingEventsWhileBlocked && // 没有设置阻塞时处理事件

    !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) // 当前客户端不属于MASTER或者CLIENT_SLAVE,同时状态也不属于CLIENT_PENDING_READ

 {

    // 将该客户端状态设置为CLIENT_PENDING_READ表明挂起待IO线程读取数据

    c->flags |= CLIENT_PENDING_READ;

    // 将该客户端放入全局clients_pending_read队列中

    listAddNodeHead(server.clients_pending_read,c);

    return 1;

 } else {

    return 0;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

Redis IO线程初始化和执行过程

我们还是先来IO线程的初始化过程,初始化动作同样在server.c的main函数中执行。我们看到在main函数中调用InitServerLast完成server的最终初始化,在InitServerLast中调用了initThreadedIO函数初始化了IO线程,在initThreadedIO方法中我们创建了io_threads_list数组,该列表保存了每个IO线程需要处理的客户端,也即数组中的每一项也是一个列表,io_threads_pending数组用于标识IO线程的工作状态,默认初始化为0,io_threads数组用于保存各个线程的id,线程的执行体为IOThreadMain。详细源码如下。

int main(int argc, char **argv) {

 ...

  InitServerLast();

 ...

}

void InitServerLast() {

 ...

  initThreadedIO();

 ...

}

void initThreadedIO(void) {

 ...

  for (int i = 0; i < server.io_threads_num; i++) {

    // 创建待处理客户端连接列表

    io_threads_list[i] = listCreate();

    if (i == 0) continue; /* Thread 0 is the main thread. */

    // 保存线程id

    pthread_t tid;

    pthread_mutex_init(&io_threads_mutex[i],NULL);

    // 将处理标志位初始化为0

    io_threads_pending[i] = 0;

    pthread_mutex_lock(&io_threads_mutex[i]);

    // 启动线程

    if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {

      serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");

      exit(1);

   }

    // 保存线程id

    io_threads[i] = tid;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

接下来我们来看线程创建成功后执行的线程执行体IOThreadMain函数执行过程。我们看到如果当前线程的io_threads_pending为0那么自旋直到io_threads_pending不为0,读者注意该操作容易导致服务端CPU利用率较高,当客户端连接被主线程挂起到当前线程的io_threads_list项时,将会把io_threads_pending设置为非0数,这时当前线程开始处理该连接,同时根据连接的状态IO_THREADS_OP_WRITE或者IO_THREADS_OP_READ来调用写或者读操作,我们这里标识的即为IO_THREADS_OP_READ读操作,这时会再次调用readQueryFromClient函数读取客户端数据,读者这里要特别注意该函数再次调用时将不会再次将自己挂起到read pending队列,因为这时的状态已经被设置为了CLIENT_PENDING_READ,详细请参考前面聊的postponeClientRead函数的实现。详细源码如下。

void *IOThreadMain(void *myid) {

 ...

  while(1) {

    // 自旋直到io_threads_pending中当前线程项不为0

    for (int j = 0; j < 1000000; j++) {

      if (io_threads_pending[id] != 0) break;

   }

   ...

    // 执行当前线程自己的io_threads_list中的客户端

    listIter li;

    listNode *ln;

    listRewind(io_threads_list[id],&li);

    // 遍历所有列表上的客户端,根据读写事件调用readQueryFromClient或者writeToClient函数

    while((ln = listNext(&li))) {

      client *c = listNodeValue(ln);

      if (io_threads_op == IO_THREADS_OP_WRITE) {

        writeToClient(c,0);

     } else if (io_threads_op == IO_THREADS_OP_READ) {

        readQueryFromClient(c->conn);

     } else {

        serverPanic("io_threads_op value is unknown");

     }

   }

    // 清空列表并标识状态为0

    listEmpty(io_threads_list[id]);

    io_threads_pending[id] = 0;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

Redis 主线程处理pending客户端连接原理

我们前面说到在readQueryFromClient函数中将客户端通过postponeClientRead函数放到了read pending队列,而在main函数中调用initThreadedIO函数创建了IO线程,而IO线程启动后将会执行IOThreadMain函数,该函数从io_threads_list队列中获取到客户端对象来执行读写操作,那么问题来了:谁负责将read pending队列的客户端结构放入到这些IO线程的io_threads_list队列呢?那必然是主线程。那么主线程又是在哪里放入的呢?这个就需要我们前面聊得ae在调用aeApiPoll获取准备好事件前的beforeSleep函数了。那么我们先来看该函数的原理。

void beforeSleep(struct aeEventLoop *eventLoop) {

 ...

  // 处理read pending队列的客户端队列

  handleClientsWithPendingReadsUsingThreads();

 ...

}int handleClientsWithPendingReadsUsingThreads(void) {

  // 获取clients_pending_read队列列表迭代器

  listIter li;

  listNode *ln;

  listRewind(server.clients_pending_read,&li);

  int item_id = 0;

  // 遍历所有待读取的客户端,并将其散列到不同IO线程处理列表中

  while((ln = listNext(&li))) {

    client *c = listNodeValue(ln);

    // 通过取余方式散列获取IO线程下标

    int target_id = item_id % server.io_threads_num;

    // 将该客户端放入该下标列表中

    listAddNodeTail(io_threads_list[target_id],c);

    item_id++;

 }

  // 所有连接放入到IO线程处理列表后将IO线程操作标识为IO_THREADS_OP_READ读操作

  io_threads_op = IO_THREADS_OP_READ;

  for (int j = 1; j < server.io_threads_num; j++) {

    // 设置io_threads_pending为非零数,也即当前需要处理的客户端数量,这时线程将会响应该操作,开始处理客户端连接

    int count = listLength(io_threads_list[j]);

    io_threads_pending[j] = count;

 }//io_threads_list数组0下标处为main线程处理,也即main线程处理一部分读IO

  listRewind(io_threads_list[0],&li);

  while((ln = listNext(&li))) {

    client *c = listNodeValue(ln);

    readQueryFromClient(c->conn);

 }

  // 清空主线程负责的下标为0的客户端列表,其他的下标由IO线程自己处理

  listEmpty(io_threads_list[0]);

  // 自旋等嗲其他线程处理IO完毕

  while(1) {

    unsigned long pending = 0;

    for (int j = 1; j < server.io_threads_num; j++)

      pending += io_threads_pending[j];

    if (pending == 0) break;

 }

  // 当所有IO线程将clients_pending_read的客户端读IO处理完毕后,在主线程中处理客户端命令

  while(listLength(server.clients_pending_read)) {

    ln = listFirst(server.clients_pending_read);

    client *c = listNodeValue(ln);

    // 去掉CLIENT_PENDING_READ标志位,并将其从clients_pending_read队列中移除

    c->flags &= ~CLIENT_PENDING_READ;

    listDelNode(server.clients_pending_read,ln);

    // 如果设置暂停客户端请求那么继续循环

    if (clientsArePaused()) continue;

    // 处理客户端命令

    if (processPendingCommandsAndResetClient(c) == C_ERR) {

      continue;

   }

    processInputBuffer(c);

    // 如果处理完毕且有数据需要写回,那么将客户端放入clients_pending_write队列等待IO线程完成写操作

    if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))

      clientInstallWriteHandler(c);

 }

  server.stat_io_reads_processed += processed;

  return processed;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131

总结

最终我们可以得出以下结论:

  1. Redis的主线程处理客户端连接操作
  2. Redis的IO线程处理客户端的读、写操作
  3. Redis IO线程处理时,Redis主线程处理部分连接完毕后需要等待IO线程处理客户端完成
  4. Redis 线程模型可以进行改进,可以操作Tomcat或者Netty的Reactor模型进行改进,也即:
  5. 一个线程接收请求
  6. IO线程负责处理读写操作
  7. 操作线程用于单线程执行命令
  8. 不同redis DB拥有自己的操作线程

总而言之,Redis基于事件回调的操作,如果不仔细看源码将会非常迷糊,本文将Redis的线程模型完全讲解,其中以读请求和接收连接作为例子,对于写请求操作亦是如此。我们可以简单地用一段话来描述Redis的请求处理流程:Redis主线程一次性获取最大为1000个客户端连接,将其放入到read pending队列中,在下一次aeMain主循环中调用beforeSleep函数,该函数将read pending队列和write pending队列中的客户端散列到IO线程中执行读写操作,并且自身负责下标为0处的客户端,然后等待IO线程完毕后再执行。所以,多线程了个寂寞,仅仅只是减少了IO的时间,事实上主线程这段时间完全可以做些别的事情,CPU的指令流水线和微服务的消息队列是最好的实践,为何不采用流式处理,这样各个线程各司其职岂不快哉?不要问redis作者为什么,他可能会说我乐意,你别用啊。。。就像之前坚持使用CLOCK_REALTIME wall time一样。