# Redis 深度分析系列三
前面我们详细描述了Redis的功能:内存数据库,用于作为后端数据库的缓存,同时由于其需要放入各种形式的数据,为了保证灵活性,其内部也通过不同的数据结构来存储这些数据,开发人员可以根据需要来选择对应的数据结构完成其业务处理。我们也在启动流程中找到了执行入口代码,还有整体的数据结构初始化。那么本问将详细介绍Redis的事件驱动模型,也即:单处理线程模型。
事件循环原理
何为事件循环?很简单:线程、事件。我们考虑下Java的线程池:多个线程、一个任务队列,如果我们创建一个Event接口,然后根据事件类型的不同实现该接口,然后通过线程池的接口提交给线程池,线程池获取到事件对象后,根据事件类型完成操作即可。 代码如下所示,我们让事件类实现Runnable接口完成自身的事件处理。当我们不断的放入不同事件,那么线程池将不断执行这些事件,而这称之为事件循环。也即事件驱动模型-----根据事件完成对应业务。
// 这里用伪代码表示
interface Event{
}
class EventA extends Event implements Runnable{}
class EventB extends Event implements Runnable{}
ThreadPoolObj.submit(new EventA());
ThreadPoolObj.submit(new EventB());
2
3
4
5
6
7
8
9
10
11
12
13
14
15
从上面例子中我们可以看到:当多个事件需要处理相同数据时,这时由于线程池中可能存在多个线程同时处理,为了保证数据一致性,那么我们需要建立互斥区来保证多线程安全,那么由于上锁和解锁的性能损耗,我们又想到:将不相关的事件并行处理,数据相关的事件串行处理,如何做呢?启动多个单线程池,提交任务时将数据相关事件放入同一个单线程池,不相关的放入不同线程池即可。这就是事件循环的核心!很简单对吧?但是,有时我们会看到线程的业务与事件是耦合的,它可能长这样,但这并不影响事件循环的思想,怎么实现业务取决于你~
TaskQueue queue = new BlockingQueue(); // 因为队列涉及到提交线程与执行线程,所以需要互斥
new Thread(()->{
switch(queue.take()){
case: task1
case: task2
}
}).start();
2
3
4
5
6
7
8
9
10
11
12
13
Redis 事件循环设计
Redis 的功能从宏观上看极其简单:从内核中获取三次握手成功的socket fd(以后简称fd,因为VFS的存在,导致了Linux中将所有事物都称为文件描述符:fd),然后根据fd中的数据调用命令完成操作,注意:这里的操作在内存中完成,然后将响应内容返回给客户端。总所周知,内存的速度访问大于网络的访问速度,这时由于竹筒效应,Redis的性能取决于网络IO的速度。
那么,我们来进一步量化它们之间的速度差异,看下图。主存访问操作只需要100ns,而对于1GB的网络来说,发送2K却需要20000ns,嗯,很直观,真的慢。那么,假如 Redis 在完成一次用户操作需要500ns(包括内部操作与缓存行的延迟),那么一个线程一秒钟可以执行多少操作呢?1s / 500ns = 1000000000 ns / 500ns = 2,000,000 个操作,这是相当的快。所以,读者考虑下:在Redis中,有必要使用多线程来完成处理么?(考虑下 上锁和解锁 的性能损耗!)
于是乎,Redis 就使用单线程周而复始(事件循环的定义)的完成以下操作:
- 从内核中获取fd
- 解析fd中的数据
- 根据数据识别命令完成操作
- 将返回数据写回客户端
Redis 事件循环接口设计
我们知道,事件循环对于应用层而言非常好实现,但对于涉及到操作系统的网络IO而言,要想实现一个单线程的事件循环不是那么简单:读、写事件如果阻塞,将会导致整个事件循环停止。那么这就需要操作系统提供非阻塞的支持,当然我们也知道这些支持在各个操作系统中的实现不一样,而我们主要研究的操作系统是Linux,所以我们就只讨论:select、poll、epoll 这三个操作系统提供的函数接口,来完成我们的事件循环设计(注意:三者的区别与异同,笔者将在后面的文章中 通过 Linux 内核的源码来进行分析,这里主要是事件循环设计,不需要了解其中的异同,只需要知道:需要适配三个接口即可)。那么接下来我们就来详细看看Redis 为了适配 事件循环 和 三个不同实现的设计,在面向对象高级语言中通常有抽象类和接口来抽象,但是正如混沌学堂里说的:C语言仅仅是汇编简单抽象而已,所以它没有接口,但是我们也说了:可以通过 数据指针、函数指针 来实现类似接口的功能:解耦合。对于C语言指针的原理可以参考混沌学堂视频,这里就不再赘述。
aeEventLoop 结构体
aeEventLoop结构体定义了多个指针用于抽象三个不同事件循环的共同特性。具体描述如下。
// 定义 执行事件循环选择函数前,回调的函数指针原型
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef struct aeEventLoop {
int maxfd; /* 当前注册到事件循环的最高文件描述符(在linux中一切皆文件,而fd用于作为数组下标映射到file文件结构,这里把它当作最大数组下标即可) */
int setsize; /* 注册到事件循环中的文件描述符的最大数目*/
long long timeEventNextId; // 用于支撑时间事件获取id值
time_t lastTime; /* 用于检测系统时钟偏差 */
aeFileEvent *events; /* 注册到事件循环组中事件(我们说:数组就是指针,所以这里的指针指向的内存空间解释为aeFileEvent数组,记住:指针类型决定内存空间如何解释,详细描述参考混沌学堂视频描述) */
aeFiredEvent *fired; /* 在事件循环组注册的fd中产生的事件,也即准备触发的事件,为events的子集 */
aeTimeEvent *timeEventHead; // 指向时间事件的头部指针
int stop; // 标识事件循环是否已经停止运行
void *apidata; /* 用于轮询特定于API中的数据 */
aeBeforeSleepProc *beforesleep; // 注册在执行事件循环选择函数前,回调的函数指针,因为执行三种系统调用,可能会导致线程在内核中阻塞
} aeEventLoop;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
aeFiredEvent 结构体
该结构体很明显用于保存一个由注册到事件循环中的fd产生的事件描述。
typedef struct aeFiredEvent {
int fd; // 所属fd
int mask; // 掩码,用于描述事件类型
} aeFiredEvent;
2
3
4
5
6
7
aeTimeEvent 结构体
该结构体用于描述一个时间事件。详细描述如下。
// 回调时间事件的函数指针原型
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
// 删除时间事件时,回调清理的函数指针原型
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef struct aeTimeEvent {
long long id; /* 时间事件标识符 */
long when_sec; /* 触发时间 秒 */
long when_ms; /* 触发时间 毫秒 */
aeTimeProc *timeProc; // 当达到执行时间时,回调执行的函数指针
aeEventFinalizerProc *finalizerProc; // 当删除时间事件时,回调的清理操作函数指针
void *clientData; // 保存自定义的关联数据指针
struct aeTimeEvent *next; // 用于单链表指向下一个时间事件结构
} aeTimeEvent;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
aeFileEvent 结构体
该结构体用于表示一个文件事件,也即 socket 事件,因为linux中一切皆文件,所以socket也是文件,所以这里将其命名为FileEvent。描述如下。
// 读写位定义
#define AE_NONE 0
#define AE_READABLE 1
#define AE_WRITABLE 2
// 定义发生读写事件时,回调的函数指针原型
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef struct aeFileEvent {
int mask; /* 用于保存 AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc; // 当读事件发生时回调的读操作函数指针
aeFileProc *wfileProc; // 当写事件发生时回调的写操作函数指针
void *clientData; // 保存自定义的关联数据指针
} aeFileEvent;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
事件循环操作函数原型
C语言中使用函数指针来完成解耦合,这里由于并不知道具体操作的事件循环的类别,所以预定义操作事件循环的函数指针原型。详细描述如下。不难看出就是对不同事件类型和事件循环CRUD。
aeEventLoop *aeCreateEventLoop(int setsize); // 定义创建事件循环的函数指针原型
void aeDeleteEventLoop(aeEventLoop *eventLoop);// 定义删除事件循环的函数指针原型
void aeStop(aeEventLoop *eventLoop); // 定义停止事件循环的函数指针原型
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData); // 定义创建文件事件的函数指针原型
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); // 定义删除文件事件的函数指针原型
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);// 定义获取文件事件的函数指针原型
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc); // 定义创建时间事件的函数指针原型
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id); // 定义删除时间事件的函数指针原型
int aeProcessEvents(aeEventLoop *eventLoop, int flags); // 定义删除时间事件的函数指针原型
int aeWait(int fd, int mask, long long milliseconds); // 定义等待事件循环产生事件的函数指针原型
void aeMain(aeEventLoop *eventLoop); // 定义执行事件循环的函数指针原型
char *aeGetApiName(void);// 定义获取事件循环实际实现的种类名(select、poll、epoll)的函数指针原型
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); // 定义设置事件循环阻塞前回调的函数指针原型
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Redis 事件循环接口实现
上面我们描述了接口定义,接下来看看redis对于Linux系统提供的三个多路复用的函数的实现细节,这里读者需要明白:多路复用是OS实现的,要研究其原理,必然得进入到Linux内核,由于本系列为Redis源码描述,为了减少篇幅和不必要的知识,所以笔者这里省略掉这三种的异同,将会在内核系列中进行更新每个函数的底层原理,读者只需要知道:Epoll使用较多即可。同时由于具体实现细节在内核,所以Redis仅仅就是对系统提供的函数进行调用。
ae_select 实现
以下代码为select函数的具体调用实现。详细描述如下。
typedef struct aeApiState {
fd_set rfds, wfds; // 读写fd集合
fd_set _rfds, _wfds;// fd集的副本,用于放入select函数中进行选择,因为在select()之后重用fd集是不安全的(考虑下选择过程中我又添加了fd会如何)
} aeApiState;
// 创建select事件循环
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState)); // 分配aeApiState内存
if (!state) return -1;
// 对aeApiState内存中读写fd集合清零
FD_ZERO(&state->rfds);
FD_ZERO(&state->wfds);
eventLoop->apidata = state; // 将aeApiState指针保存在特定实现下的apidata中
return 0;
}
// 释放事件循环内存
static void aeApiFree(aeEventLoop *eventLoop) {
zfree(eventLoop->apidata);
}
// 向事件循环中添加fd
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
// 根据事件类型:读或者写,放入对应的集合
if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
return 0;
}
// 从事件循环中删除fd
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
// 根据事件类型:读或者写,从对应的集合中删除
if (mask & AE_READABLE) FD_CLR(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds);
}
// 获取已经产生事件的fd,tvp用于当没有事件时等待的事件描述(可以从源码中看到:select函数效率极其低下,每次选择完毕后,还得遍历所有的fd找到产生事件的fd,因为select函数本身只会告诉你:产生了事件,谁产生了不知道)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
// 首先将rfds和wfds复制到_rfds和_wfds,用于select函数使用
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
// 将_rfds与_wfds ,也即读写事件放入到select函数中,由内核对其中的fd的读写事件进行监测
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) { // 有事件产生
for (j = 0; j <= eventLoop->maxfd; j++) { // 遍历fd找到发生事件的fd并将它们放入到 eventLoop->fired 中
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue; // 当前fd没有事件发生
// 根据发生事件的类型将mask对应位设置,并放入eventLoop->fired中,由于eventLoop->fired是一个数组,所以直接使用numevents来表示数组下标即可
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
// api名字
static char *aeApiName(void) {
return "select";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
ae_poll 实现
由于poll函数与select函数一样,只不过不像select那样存在fd的大小限制,所以redis并没有实现poll函数的封装。
ae_epoll 实现
epoll为linux中多路复用的首选,也是性能最高的实现,它跟select不一样,将会告诉你:产生了多少事件,那些fd产生了事件,而不需要你遍历所有fd找到发生事件的fd,同时也不需要像select一样,每次选择都需要将所有fd放入内核,epoll只需要注册到内核中即可,除非你显示删除它,否则将一直监听它的事件。详细描述如下。
typedef struct aeApiState {
int epfd; // 一切皆文件,所以epoll的打开,也等于打开了一个文件,所以这里保存epoll fd
struct epoll_event *events; // 保存事件数组指针
} aeApiState;
// 创建epoll fd
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState)); // 分配aeApiState内存
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); // 分配用于保存事件的数组
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 创建epoll fd,注意这里的1024仅仅给内核一个提示,大概会有1024个,至于内核如何处理,取决于实现,甚至可以忽略它 */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state; // 将aeApiState指针放入apidata特定数据项
return 0;
}
// 关闭epoll,这里首先关闭内核打开的epoll fd,然后释放内存
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
close(state->epfd);
zfree(state->events);
zfree(state);
}
// 向 epoll 中添加需要监听的fd
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
// 如果fd已经被监听,那么我们需要一个MOD修改操作,否则我们需要一个ADD添加操作。
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
// 将之前的事件类型于新的事件类型混合。EPOLLIN表示监听读事件、EPOLLOUT表示监听写事件
mask |= eventLoop->events[fd].mask;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; // 避免valgrind警告
ee.data.fd = fd; // 保存fd
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; // 调用系统调用函数修改或者添加fd
return 0;
}
// 从epoll 中删除 fd
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
// 如果只删除fd某个监听事件,那么直接取反与即可
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; // 避免valgrind警告
ee.data.fd = fd; // 保存fd
if (mask != AE_NONE) { // 如果仍然存在需要监听的事件类型,因为我们可能只移除某个事件类型,比如读,可以保留写,此时需要MOD修改操作
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
// 否则表示需要将fd直接从epoll中移除,那么调用DEL操作。注意,内核版本如果小于2.6.9需要一个非空的事件指针,即使是DEL删除操作,所以这里传入了&ee
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
// 获取已经产生事件的fd,tvp用于当没有事件时等待的事件描述
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 调用epoll_wait系统调用,获取已经产生事件的fd,state->events用于接收这些fd,eventLoop->setsize用于指示接收的大小,通常等于state->events的长度,tvp用于表示等待事件的事件,这里需要转换单位(因为epoll_wait需要的参数为 毫秒):tv_sec*1000(秒转为毫秒) +tv_usec/1000(微秒转为 毫秒)
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) { // 遍历已经发生事件的fd,根据类型设置AE_READABLE与AE_WRITABLE,这里需要注意:EPOLLOUT(写事件)、EPOLLERR(监听的文件描述符上发生了错误事件)、EPOLLHUP(监听的文件描述符上发生了挂起事件) 均属于AE_WRITABLE,此时就需要在使用这些fd时,进行写入检测,这点我们后面会看到
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
static char *aeApiName(void) {
return "epoll";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191