# Redis 深度分析系列四

前面我们详细分析了Redis 通过 ae 模型 实现了基于 Linux 中的两大多路选择器的事件驱动模型,但是我们仅仅看到了在 ae.h 中定义的结构体和函数指针 和 ae_epoll.c 、ae_select.c 对操作系统提供的函数库的调用,那么本节我们将详细介绍Redis 中对于 这些函数指针的实现,以及Redis如何根据 所处 OS 不同选择系统提供的多路选择器。

如何根据环境选择多路复用器

先复习下C语言的基础:条件宏定义、.h和.c 文件的区别(详细可参考:混沌学堂描述)。我们先来看redis.h文件的文件引入。

#include "ae.h"   /* 事件驱动函数库 */
1

然后我们来看redis.c 中对事件驱动函数库的调用。通过我们在混沌学堂学习的ELF原理,很明显,当汇编器生成redis.o和ae.o 时,由于redis .h 中引入了ae.h,此时将会和ae.o进行静态链接,所以此时我们只需要关注ae.c即可。

void initServer() {

  server.el = aeCreateEventLoop(server.maxclients+1024);

 ...

  aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

  if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,

    acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");

  if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,

    acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

}int main(int argc, char **argv) {

 ...

  aeMain(server.el);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

我们来看ae.c中的宏定义。很明显,redis 将根据 OS 的不同自动通过条件宏选择合适的高性能多路复用器使用,而又由于在ae.h中定义了函数指针和公用结构体,所以每个实现都包含了相同的结构和函数实现,所以我们可以说Redis通过指针和结构体实现了面向对象语言中的接口和实现。

#ifdef HAVE_EVPORT // solaris 的多路复用器

#include "ae_evport.c"

#else

  #ifdef HAVE_EPOLL // linux 的多路复用器

  #include "ae_epoll.c"

  #else

    #ifdef HAVE_KQUEUE // macos 的多路复用器

    #include "ae_kqueue.c"

    #else

    #include "ae_select.c" // 类Unix都实现的多路复用器性能较差

    #endif

  #endif

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

那么我们继续跟进:Redis 又是在哪里 定义这些宏的呢:HAVE_EVPORT、HAVE_EPOLL、HAVE_KQUEUE,很明显:一看就是配置文件中定义的,来看config.h。那么,我们可以看到,将根据OS 的类型来定义上述宏,于是在ae.c中将自动引入对应事件驱动的实现,然后与ae.c中的其他函数共同汇编成静态链接库:ae.o(详细参考混沌学堂对于ELF的详细描述)。

#ifdef __linux__

#define HAVE_EPOLL 1

#endif#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)

#define HAVE_KQUEUE 1

#endif#ifdef __sun

#include <sys/feature_tests.h>

#ifdef _DTRACE_VERSION

#define HAVE_EVPORT 1

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

可能读者又会存在疑惑:linux 宏又是怎么定义的呢?通常我们会在MakeFile文件中定义: CFLAGS += -Dlinux

ae.c 函数实现

aeCreateEventLoop

该函数用于根据设置的setsize创建事件循环。可以看到这里首先分配aeEventLoop结构的空间,然后设置属性,随后调用aeApiCreate 函数,该函数将完成实际与OS相关的多路复用函数完成对aeEventLoop的进一步初始化。源码如下。

aeEventLoop *aeCreateEventLoop(int setsize) {

  aeEventLoop *eventLoop;

  int i;

  if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

  eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);

  eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

  if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;

  eventLoop->setsize = setsize;

  eventLoop->lastTime = time(NULL);

  eventLoop->timeEventHead = NULL;

  eventLoop->timeEventNextId = 0;

  eventLoop->stop = 0;

  eventLoop->maxfd = -1;

  eventLoop->beforesleep = NULL;

  if (aeApiCreate(eventLoop) == -1) goto err;

  for (i = 0; i < setsize; i++) // 初始化所有事件结构的感兴趣事件为AE_NONE

    eventLoop->events[i].mask = AE_NONE;

  return eventLoop;

​

  err: // 发生错误,释放分配内存

  if (eventLoop) {

    zfree(eventLoop->events);

    zfree(eventLoop->fired);

    zfree(eventLoop);

 }

  return NULL;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

aeCreateFileEvent

该函数用于向事件循环中添加fd,而对于Linux而言,一切皆文件,所以这里fd指的是socket fd,也即客户端连接。我们可以看到这里首先调用 aeApiAddEvent 操作实际的多路复用器,然后根据感兴趣事件:读或者写,设置 rfileProc 或者 wfileProc函数指针,当监听的fd的对应事件发生时,将会回调该函数指针,随后将客户端与fd绑定的数据指针 clientData 保存在 aeFileEvent 的 clientData 成员中。源码如下。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData){

  if (fd >= eventLoop->setsize) return AE_ERR;

  aeFileEvent *fe = &eventLoop->events[fd];if (aeApiAddEvent(eventLoop, fd, mask) == -1)

    return AE_ERR;

  fe->mask |= mask;

  if (mask & AE_READABLE) fe->rfileProc = proc;

  if (mask & AE_WRITABLE) fe->wfileProc = proc;

  fe->clientData = clientData;

  if (fd > eventLoop->maxfd) // 保存最后的fd,fd为file打开文件的下标,所以这里保存最大值即可

    eventLoop->maxfd = fd;

  return AE_OK;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

aeDeleteFileEvent

该函数将操作实际多路选择器删除监听的fd。首先获取 fd 对应的 aeFileEvent结构,然后取消该fd的感兴趣事件集合,由 mask 变量指定,如果当前fd为最大fd且感兴趣事件集为AE_NONE,那么当调用aeApiDelEvent函数后,将会从多路复用器中解除监听,此时需要更新maxfd,如果不是 最大fd,那么直接删除即可。源码如下。

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)

{

  if (fd >= eventLoop->setsize) return;

  aeFileEvent *fe = &eventLoop->events[fd];if (fe->mask == AE_NONE) return;

  fe->mask = fe->mask & (~mask);

  if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { // 更新 maxfd

    int j;

    for (j = eventLoop->maxfd-1; j >= 0; j--) // 从后往前遍历,找到最后一个感兴趣事件集合(监听事件集合)不为空的fd

      if (eventLoop->events[j].mask != AE_NONE) break;

    eventLoop->maxfd = j;

 }

  aeApiDelEvent(eventLoop, fd, mask);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

aeCreateTimeEvent

该函数用于向事件循环添加时间事件,注意:操作系统提供的多路复用将不涉及到时间事件的处理,所以对于该事件需要Redis自身完成处理。我们可以看到首先 创建 aeTimeEvent 结构,随后调用 aeAddMillisecondsToNow 计算时间到期时间,然后将 aeTimeEvent 结构采用头插法放入到 eventLoop->timeEventHead 中,其他 aeTimeEvent 将通过 当前 aeTimeEvent 的 next 变量关联。源码如下。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,

              aeTimeProc *proc, void *clientData,

              aeEventFinalizerProc *finalizerProc)

{

  long long id = eventLoop->timeEventNextId++;

  aeTimeEvent *te;

  te = zmalloc(sizeof(*te));

  if (te == NULL) return AE_ERR;

  te->id = id;

  aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);

  te->timeProc = proc;

  te->finalizerProc = finalizerProc;

  te->clientData = clientData;

  // 头插法

  te->next = eventLoop->timeEventHead;

  eventLoop->timeEventHead = te; 

  return id;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

aeDeleteTimeEvent

该方法用于从时间事件链表中删除 指定 timeEventNextId 的事件,我们看到 该方法将会便利整个链表(通过头节点遍历),找到对应id的事件时,将其从链表中摘除,同时看看是否存在finalizerProc,调用该函数完成清理。源码如下。

int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)

{

  aeTimeEvent *te, *prev = NULL;

​

  te = eventLoop->timeEventHead;

  while(te) { // 遍历链表找到 timeEventNextId 为 id 的事件删除

    if (te->id == id) {

      if (prev == NULL)

        eventLoop->timeEventHead = te->next;

      else

        prev->next = te->next;

      if (te->finalizerProc)

        te->finalizerProc(eventLoop, te->clientData);

      zfree(te);

      return AE_OK;

   }

    prev = te;

    te = te->next;

 }

  return AE_ERR; /* NO event with the specified ID found */

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

aeMain

该函数之前我们看到过,将会检测多路复用器完成事件的处理。源码如下。

void aeMain(aeEventLoop *eventLoop) {

  eventLoop->stop = 0;

  while (!eventLoop->stop) {

    if (eventLoop->beforesleep != NULL)

      eventLoop->beforesleep(eventLoop);

    aeProcessEvents(eventLoop, AE_ALL_EVENTS);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

aeProcessEvents

该函数为事件处理的主要函数。详细描述如下。

// 执行事件时的标识宏

#define AE_FILE_EVENTS 1

#define AE_TIME_EVENTS 2

#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) // 同时执行文件和时间事件

#define AE_DONT_WAIT 4int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

  int processed = 0, numevents;

  // 若不处理任何事件,那么直接返回

  if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

  if (eventLoop->maxfd != -1 || // 存在监听的fd

   ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { // 执行时间事件且可以执行等待

    int j;

    aeTimeEvent *shortest = NULL;

    struct timeval tv, *tvp;

    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 找到最近需要执行的时间事件

      shortest = aeSearchNearestTimer(eventLoop);

    if (shortest) { // 存在最近需要执行的时间时间

      long now_sec, now_ms;

      aeGetTime(&now_sec, &now_ms);// 获取当前时间( 通过 gettimeofday(&tv, NULL)函数 )

      tvp = &tv;

      // 计算需要执行的时间,保存在 tv_sec 与 tv_usec变量中

      tvp->tv_sec = shortest->when_sec - now_sec; 

      if (shortest->when_ms < now_ms) {

        tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;

        tvp->tv_sec --;

     } else {

        tvp->tv_usec = (shortest->when_ms - now_ms)*1000;

     }

      // 当前时间便是 shortest 时间事件的触发时间

      if (tvp->tv_sec < 0) tvp->tv_sec = 0;

      if (tvp->tv_usec < 0) tvp->tv_usec = 0;

   } else {

      // 不存在需要执行的时间时间,且AE_DONT_WAIT 表示不需要等待,那么设置tv_sec与tv_usec为0,将不执行等待,否则 设置 timeval tvp 为空,此时将无限期等待

      if (flags & AE_DONT_WAIT) {

        tv.tv_sec = tv.tv_usec = 0;

        tvp = &tv;

     } else {

        tvp = NULL;

     }

   }

    numevents = aeApiPoll(eventLoop, tvp); // 将计算好的tvp调用 多路复用器的封装函数,注意:tvp 包含 等待时间,考虑下:如果最近的时间事件在1s后发生,那么我们应该让 操作系统 在等待 1s 后 还没有事件发生 也需要返回,此时用tvp来表示这个时间

    for (j = 0; j < numevents; j++) { // 若存在触发的事件,那么遍历这些事件,根据事件类型调用rfileProc或者wfileProc函数完成处理

      aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

      int mask = eventLoop->fired[j].mask;

      int fd = eventLoop->fired[j].fd;

      int rfired = 0;

      if (fe->mask & mask & AE_READABLE) {

        rfired = 1;

        fe->rfileProc(eventLoop,fd,fe->clientData,mask);

     }

      if (fe->mask & mask & AE_WRITABLE) {

        if (!rfired || fe->wfileProc != fe->rfileProc)

          fe->wfileProc(eventLoop,fd,fe->clientData,mask);

     }

      processed++;

   }

 }

  // 检测时间时间,完成时间时间的处理

  if (flags & AE_TIME_EVENTS)

    processed += processTimeEvents(eventLoop);

  return processed;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

processTimeEvents

该函数用于执行时间事件。详细描述如下。

static int processTimeEvents(aeEventLoop *eventLoop) {

  int processed = 0;

  aeTimeEvent *te;

  long long maxId;

  time_t now = time(NULL); // 获取当前时间

  if (now < eventLoop->lastTime) { // 发生时钟偏移,也即 系统时间回退到之前的时间,那么redis的作者认为需要:提前执行所有时间事件,也远比 时间事件滞后 合理。

    te = eventLoop->timeEventHead;

    while(te) { // 遍历所有时间事件,将它们的触发时间设置为0,表示立即触发

      te->when_sec = 0;

      te = te->next;

   }

 }

  eventLoop->lastTime = now; // 记录当前执行时间

  te = eventLoop->timeEventHead;

  maxId = eventLoop->timeEventNextId-1;

  while(te) { // 循环遍历时间事件,找到可以执行的时间事件回调timeProc函数(是不是感觉贼蠢?但是你要考虑下这种时间事件很少,如果多了 优先级队列 用于增加插入时间,减少获取时间将比较合适~)

    long now_sec, now_ms;

    long long id;if (te->id > maxId) {

      te = te->next;

      continue;

   }

    aeGetTime(&now_sec, &now_ms);

    if (now_sec > te->when_sec ||

     (now_sec == te->when_sec && now_ms >= te->when_ms))

   {

      int retval;

​

      id = te->id;

      retval = te->timeProc(eventLoop, id, te->clientData);

      processed++;

      // 根据返回值决定是否继续执行

      if (retval != AE_NOMORE) {

        aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);

     } else {

        aeDeleteTimeEvent(eventLoop, id);

     }

      te = eventLoop->timeEventHead;

   } else {

      te = te->next;

   }

 }

  return processed;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

aeWait

该函数用于等待指定的fd发生事件:读、写、异常(通常用于 哨兵模式,slave 等待 master时使用)。源码描述如下。

int aeWait(int fd, int mask, long long milliseconds) {

  struct pollfd pfd;

  int retmask = 0, retval;

  memset(&pfd, 0, sizeof(pfd));

  pfd.fd = fd;

  // 根据等待事件类型设置POLLIN或者POLLOUT

  if (mask & AE_READABLE) pfd.events |= POLLIN;

  if (mask & AE_WRITABLE) pfd.events |= POLLOUT;

  // 调用poll函数完成等待

  if ((retval = poll(&pfd, 1, milliseconds))== 1) {

    if (pfd.revents & POLLIN) retmask |= AE_READABLE;

    if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;

    if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;

    if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;

    return retmask;

 } else {

    return retval;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37