# LInux 2.6 Epoll 原理分析

Epoll函数用于实现Java和众多语言底层的IO多路复用,我们可以创建一个epoll对象,然后将我们的fd(File Descriptor 文件描述符对象)注册到epoll对象中,之后我们可以通过调用epoll对象的查询函数来获取准备好事件的fd对象(Java Coder可以参考下Selector、Channel的使用方式,C Coder不用我介绍了吧)。本文用于详细解释在Linux2.6版本时,Epoll的三大函数的源码与原理分析,详细说明其中的设计技巧。Linux 现在发展到了非常高的版本,本文选取最开始的Epoll实现,此版本没有经过任何优化的修改,最为纯粹,方便读者理解。后面有时间再详细的编写高版本对于Epoll的优化,包括红黑树的引入等等。本文稍长,需静下心来理解之。

三大函数原型

epoll_create函数

该函数用于创建一个代表Epoll对象的fd。我们知道Linux继承了Unix的优良血统,将一切均视为文件对象处理,这里的文件有两层含义:真实的磁盘上的文件、虚拟文件。而我们这里的Epoll便是一个虚拟文件对象。该函数的参数定义如下:

  1. __size参数:用于提供给内核一个提示,当前需要监听的fd个数,具体怎么做由内核来处理
  2. 返回值:代表当前Epoll对象的fd

int epoll_create (int __size);

epoll_ctl函数

该函数用于控制上面我们由epoll_create方法创建的Epoll 对象。我们可以通过该函数操作Epoll 实现添加、修改、删除监听的fd对象。参数定义如下:

  1. __epfd:表示我们通过epoll_create方法创建的Epoll 对象
  2. __op:表示操作类型:EPOLL_CTL_ADD(添加监听的fd)、EPOLL_CTL_DEL(删除监听的fd)、EPOLL_CTL_MOD(修改监听的fd)
  3. __fd:表示需要操作目标的fd对象
  4. __event:表示用于描述需要监听的fd对象的感兴趣事件类型
  5. 返回值:0表示成功,-1表示失败

int epoll_ctl (int __epfd, int __op, int __fd,struct epoll_event *__event);

epoll_wait函数

该函数用于获取Epoll对象监听的fd对象列表,我们可以通过该参数实现我们通过epoll_ctl添加到Epoll对象中监听的且准备好事件的fd对象。该函数的参数定义如下:

  1. __epfd:表示我们通过epoll_create方法创建的Epoll 对象
  2. __events:表示用于接收准备好事件的fd对象的缓冲区
  3. __maxevents:表示这一次调用可以接收多少准备好的fd对象,通常我们将该参数设置为events参数的长度
  4. __timeout:表示如果没有准备好的事件对象,那么等待多久返回
  5. 返回值:返回events缓冲区中有效的fd个数,也即准备好事件的fd个数

epoll_wait (int __epfd, struct epoll_event *__events, int __maxevents, int __timeout);

epoll_event与epoll_data对象

epoll_data结构用于保存用户数据,epoll_event结构用于表示监听的fd对象的事件和用户数据。这里读者可以很容易的理解:Epoll需要监听fd并且需要指定监听的事件类型,而这时我们就需要epoll_event数据载体,同时我们可能需要设置一些与之关联的数据,比如fd对象,用户自己的数据ptr等等(Java Coder 可以参考下Selector和Channel绑定时可以指定Attachment一样)。详细描述如下。

typedef union epoll_data{

 void *ptr; // 保存数据指针

 int fd; // 监听的fd对象

 uint32_t u32; // 保存一个32位值

 uint64_t u64; // 保存一个64位值

} epoll_data_t;



struct epoll_event{

 uint32_t events; // 监听的事件类型:

 epoll_data_t data; // 用户数据载体

} __EPOLL_PACKED;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

边缘触发与水平触发

Epoll对象对于监听的fd处理方式有两种:边缘触发(edge-triggered)简称ET、 水平触发(level-triggered)简称LT。在ET模式下,当我们通过epoll_wait函数获取到准备好的事件后,如果没有处理完成所有事件,那么再次调用epoll_wait函数将不会再次返回该fd,而LT模式下如果fd的事件没有处理完成,那么在下一次调用epoll_wait函数时将会返回该fd。我们用一个例子来说明:

  1. 首先我们将一个读取数据的rfd注册到epoll对象中
  2. 然后我们往这个rfd中写入2KB的数据
  3. 当我们调用epoll_wait函数时,由于此时有2KB的数据可以读取,那么将会返回这个已经准备好事件的rfd
  4. 接着我们读取rfd中1KB的数据,此时注意rfd中还有1KB没有处理
  5. 然后我们再次调用epoll_wait函数

此时,如果我们使用添加rfd时指定EPOLLET,也即ET模式时,那么尽管rfd中还有1KB的数据可以读取,但是用于我们处于ET模式,此时再次调用epoll_wait函数将会阻塞当前线程,因为ET模式需要用户自己处理完当前事件的数据。当然如果我们使用EPOLLLT,那么此时再次调用epoll_wait函数那么将会返回剩余1KB读取的rfd对象。

使用示例

我们这里以一个服务端监听连接事件的listen_sock fd为例来说明Epoll如何使用。读者可以从例子中看到,其实就是创建Epoll对象,使用提供的三个函数进行CRUD(Java Coder可以类比NIO的ServerScoketChannel和ScoketChannel来学习,因为操作方式一模一样,只不过屏蔽了Epoll的细节)。

#define MAX_EVENTS 10 // 定义处理的最大事件个数

struct epoll_event ev, events[MAX_EVENTS]; // 初始化ev数据载体和接收准备好的fd的事件数组

int listen_sock, conn_sock, nfds, epollfd; // 初始化监听连接的listen_sock fd、客户端连接conn_sock fd、接收epoll_wait函数返回的准备好事件的个数变量nfds、Epoll对象epollfd fd

epollfd = epoll_create(10); // 创建epoll对象

if (epollfd == -1) {

 perror("epoll_create");

 exit(EXIT_FAILURE);

}

// 设置ev数据载体。设置感兴趣事件为EPOLLIN代表读事件

ev.events = EPOLLIN;

ev.data.fd = listen_sock; // 设置用户数据载体中的fd为listen_sock

if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) { // 将其添加到epoll监听列表中

 perror("epoll_ctl: listen_sock");

 exit(EXIT_FAILURE);

}

// 循环处理所有事件

for (;;) {

 nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1); // 查询epoll函数中是否有准备好的事件,我们这里使用events数组来接收准备好的事件fd,使用MAX_EVENTS来指定接收的最大事件数量,使用-1来表明timeout为无限期,也即当没有事件时阻塞当前进程

 if (nfds == -1) {

  perror("epoll_pwait");

  exit(EXIT_FAILURE);

 }

 for (n = 0; n < nfds; ++n) { // 循环处理已经准备好事件的fd

  if (events[n].data.fd == listen_sock) { // 我们这里以处理监听客户端连接的listen_sock fd为例说明

   conn_sock = accept(listen_sock,(struct sockaddr *) &local, &addrlen); // 接收客户端连接

   if (conn_sock == -1) {

    perror("accept");

    exit(EXIT_FAILURE);

   }

   setnonblocking(conn_sock); // 设置客户端fd为非阻塞模式

   ev.events = EPOLLIN | EPOLLET; // 设置触发模式为ET并且感兴趣事件类型为EPOLLIN读事件

   ev.data.fd = conn_sock; // 指定数据载体fd为conn_sock

   if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,&ev) == -1) { // 将其添加到Epoll监听对象中

    perror("epoll_ctl: conn_sock");

    exit(EXIT_FAILURE);

   }

  } else { // 如果是其他事件,我们还可以继续处理

   do_use_fd(events[n].data.fd);

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

小结

我们看到epoll_create函数用于创建对象,而epoll_ctl函数用于对Epoll对象增删改操作,epoll_wait用于对Epoll对象实现查询操作。epoll_event与epoll_data对象用于承载与Epoll对象交互的数据结构。而对于ET和LT触发而言,我们知道ET模式一个事件只会触发一次,如果在该事件中的数据没有处理完毕,那么再下一个事件到来时,将不会再次返回监听的fd。而对于LT而言如果数据没有处理完毕,那么我们将可以再次调用epoll_wait函数处理未处理完成数据的fd。

三大函数内核原理

在前面了解了如何使用Epoll后,本节将会从三个函数的Linux内核源码进行原理讲解。因为Linux中一切皆文件,但是由于文件系统是另外一个模块,本文肯定不会花大量篇幅介绍文件系统相关的概念,毕竟那样就跑题了,但是由于读者的水平参差不齐,还是有必要介绍一下VFS的相关概念。我们前面说到过Linux中一切皆文件,而对于文件来说有在磁盘上真实存在的文件和虚拟文件,比如Epoll文件。为了兼容这两者,Linux提出了VFS的虚拟文件系统,用其作为访问文件系统的抽象层,而对于应用而言只需要面对VFS即可,而不需要知道底层是虚拟文件还是真实文件,反正一切皆文件,按文件形式来处理即可。这时就引入了file结构体,该结构体代表了一个文件对象,而inode结构体用于表示文件的元数据信息,也称之为index node 索引节点。每个进程都需要打开文件,而这时就等同于创建了一个file对象,而我们不可能把file对象暴露给用户空间,读者一定要记住:内核永远不信任用户空间的数据,这就意味着从用户空间过来的数据都需要拷贝到内核中才能使用(当然对于基本数据类型变量我们只需要强转为不会造成内核瘫痪的数据即可,比如下面的size变量)。而对于从内核到用户空间的数据,也需要拷贝,同时不会将内核的数据结构暴露给用户空间,这时就需要一个映射,将file文件对象映射到一个整形变量,将这个整形变量返回给用户空间,而用户空间如果要操作file对象,那么传入该fd,内核就可以反映射到file文件对象操作即可,这个映射信息也即保存在进程的PCB控制块中,对于Linux而言就是task_struct(在不久将来,作者将会基于该版本详细介绍进程管理模块,读者这里了解下即可)。

sys_epoll_create原理

该函数用于创建epoll对象同时返回fd。我们看到首先通过ep_get_hash_bits函数计算出hashbits变量,随后调用ep_getfd函数创建一个Epoll对象,并且将其与Epoll fd关联。最后调用ep_file_init函数初始化epoll对象。对于Java Coder而言,可能不太熟悉goto语句,但是还是需要习惯一下,这里我们看到定义了两个退出点:eexit_2、eexit_1,分别用于在不同错误下进行返回执行清理工作或者打印错误信息。详细描述如下(再次强调一下:Linux中一切皆文件,Epoll对象也是文件)。

long sys_epoll_create(int size){

 int error, fd;

 unsigned int hashbits;

 struct inode *inode;

 struct file *file;

 // 根据传入的hint size(提示大小)计算出hash位数

 hashbits = ep_get_hash_bits((unsigned int) size);

 // 创建一个Epoll对象,并且将其与Epoll fd关联

 error = ep_getfd(&fd, &inode, &file);

 if (error)

  goto eexit_1;

 // 初始化Epoll对象

 error = ep_file_init(file, hashbits);

 if (error)

  goto eexit_2;

 return fd; // 返回fd

 eexit_2:

 sys_close(fd);

 eexit_1:

 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n",

    current, size, error));

 return error;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

ep_getfd函数原理

该函数用于初始化efd指针、einode指针、efile指针,分别表示为Epoll对象的fd、Epoll对象的文件的元数据对象、文件对象,同时将efd与dfile文件对象进行了关联。由于篇幅有限,并且又会涉及到内存模块相关的知识,所以作者这里并没有将ep_eventpoll_inode、d_alloc等等这些与VFS相关的内容展开,因为那并不属于Epoll的研究范畴了,我们只需要从该函数中注意到:file->f_op = &eventpoll_fops这行代码即可。我们知道一个文件是需要操作的,而这些操作的函数便在eventpoll_fops结构中,当我们调用文件对象的这些函数时,由于我们设置了地址为eventpoll_fops,所以将会调用这里面的函数进行操作。对于Java Coder而言,这就是C语言中的接口。我们定义了一堆接口方法,那么需要实现,而eventpoll_fops就是实现。对于C语言而言并没有接口的概念,不过C语言有指针,我们设置函数指针指向不同的函数便实现了抽象的过程,这也是VFS的核心。详细实现如下。

static int ep_getfd(int *efd, struct inode **einode, struct file **efile)

{

 struct qstr this;

 char name[32];

 struct dentry *dentry; // Epoll文件目录对象

 struct inode *inode; // Epoll索引节点对象

 struct file *file; // Epoll文件对象

 int error, fd;

 error = -ENFILE;

 file = get_empty_filp(); // 获取一个空的文件对象,在这里面我们就会进行max_files打开的最大文件对象的限制检测,同时分配Epoll文件对象file

 if (!file)

  goto eexit_1;

 inode = ep_eventpoll_inode(); // 接着我们分配Epoll索引节点

 error = PTR_ERR(inode);

 if (IS_ERR(inode))

  goto eexit_2;

 // 然后从进程的files_struct结构中获取一个空闲的fd,在这里面我们就需要进行进程打开的最大文件描述符限制的检测了,在当前版本中的限制为:#define INR_OPEN 1024

 error = get_unused_fd();

 if (error < 0)

  goto eexit_3;

 fd = error;

 error = -ENOMEM;

 sprintf(name, "[%lu]", inode->i_ino);

 this.name = name;

 this.len = strlen(name);

 this.hash = inode->i_ino;

 dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this); // 分配目录节点

 if (!dentry)

  goto eexit_4;

 dentry->d_op = &eventpollfs_dentry_operations; // 设置目录操作

 d_add(dentry, inode);

 // 设置file对象文件属性

 file->f_vfsmnt = mntget(eventpoll_mnt);

 file->f_dentry = dget(dentry);

 file->f_pos = 0;

 file->f_flags = O_RDONLY;

 file->f_op = &eventpoll_fops; // 我们关注这里即可,初始化了文件对象的基础操作回调函数

 file->f_mode = FMODE_READ;

 file->f_version = 0;

 file->private_data = NULL;

 // 将fd与file对象进行关联,读者这里就把fd当成数组下标,然后数组中的对象为file即可

 fd_install(fd, file);

 // 将efd、einode、efile指针指向前面分配的三大结构的地址

 *efd = fd;

 *einode = inode;

 *efile = file;

 return 0;

 eexit_4:

 put_unused_fd(fd);

 eexit_3:

 iput(inode);

 eexit_2:

 put_filp(file);

 eexit_1:

 return error;

}



// Epoll文件操作的函数结构体

static struct file_operations eventpoll_fops = {

 .release = ep_eventpoll_close,

 .poll  = ep_eventpoll_poll

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

ep_file_init函数原理

我们在ep_getfd函数中看到的都是VFS相关的对象,而我们说Epoll文件是一个虚拟文件,那么虚拟文件就需要一个数据载体来表示Epoll结构,而eventpoll结构,便是这样一个用来实际操作Epoll的核心数据结构之一。我们看到首先分配一个eventpoll结构的空间,随后调用ep_init函数初始化该结构,为了保持简单明了,作者这里就不展开该方法讲解,我们在下一小节直接看该结构的数据定义即可。详细描述如下。

static int ep_file_init(struct file *file, unsigned int hashbits)

{

 int error;

 struct eventpoll *ep;

 // 首先调用内存管理模块的kmalloc函数分配一个eventpoll结构

 if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)))

  return -ENOMEM;

 memset(ep, 0, sizeof(*ep)); // 对分配的内存进行清0

 error = ep_init(ep, hashbits); // 初始化eventpoll结构

 if (error) {

  kfree(ep);

  return error;

 }

 file->private_data = ep; // 将文件对象关联到当前eventpoll结构

 return 0;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

eventpoll结构原理

该结构存放在efile文件对象的private_data中,再次强调一遍:Linux一切皆文件,此时需要真实的文件载体,而该结构就是这个载体,可能读者会问如果不介入VFS,是不是直接用该结构就可以了?答案是的。详细的参数描述如下。

struct eventpoll {

 rwlock_t lock; // 保护该结构的读写锁

 struct rw_semaphore sem; // 用于保护eventpoll文件对象的读写信号量

 wait_queue_head_t wq; // sys_epoll_wait()函数时,用于保存阻塞进程的等待队列

 wait_queue_head_t poll_wait; // 用于调用file->poll()函数时,阻塞进程的等待队列

 struct list_head rdllist; // 已经准备好事件的fd列表

 unsigned int hashbits; // 通过传入的size计算的hash位数

 char *hpages[EP_MAX_HPAGES]; // 用于存放struct epitem的数据页

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

epitem结构原理

该结构用于表示添加到Epoll中的文件信息,每一个添加到Epoll中监听的fd都会拥有这样的一个结构。

struct epitem { struct list_head llink; // 用于将该epitem结构关联到对应的Epoll对象中 struct list_head rdllink; // 用于将该epitem结构关联到对应的Epoll对象中的rdllist准备好事件的fd列表中 int nwait; // poll操作的活动等待队列数 struct list_head pwqlist; // 包含轮询等待队列的列表 struct eventpoll *ep; // 所属epoll对象 int fd; // 关联的文件fd struct file *file; // 关联的文件对象 struct epoll_event event; // fd感兴趣事件集 atomic_t usecnt; // 保存当前结构的引用计数 struct list_head fllink; // 将该结构连接到struct file文件的items列表 struct list_head txlink; // 将该item连接到transfer转移列表 unsigned int revents; // 返回给用户空间的事件集 };

sys_epoll_ctl原理

该函数用于操作Epoll对象,对Epoll对象添加、删除、修改操作。我们看到首先通过epfd、fd取出epoll文件对象和操作的文件对象,然后进行校验,我们注意到只有支持tfile->f_op->poll操作的对象才可以用于epoll监听,随后我们根据op操作调用不同函数来操作epoll对象。同时读者应该需要知道epoll_event的事件结构中的events变量是一个整形变量,所以是按位作为标志位来开启不同的感兴趣事件,我们通过或运算符可以组合这些位。详细实现如下。

long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event){

 int error;

 struct file *file, *tfile;

 struct eventpoll *ep;

 struct epitem *epi;

 struct epoll_event epds;



 error = -EFAULT;

 if (copy_from_user(&epds, event, sizeof(struct epoll_event))) // 首先将用户空间传递的epoll_event拷贝到内核空间(内核不应该直接使用用户空间的数据)

  goto eexit_1;

 error = -EBADF;

 file = fget(epfd); // 获取epfd所代表的epoll文件对象(通过我们前面介绍的fd->file的映射)

 if (!file)

  goto eexit_1;

 tfile = fget(fd); // 获取要操作的fd文件对象

 if (!tfile)

  goto eexit_2;

 error = -EPERM;

 if (!tfile->f_op || !tfile->f_op->poll) // 检测要操作的tfile文件对象是否支持poll操作,不支持poll操作的文件对象不支持epoll操作

  goto eexit_3;



 error = -EINVAL;

 if (file == tfile || !IS_FILE_EPOLL(file)) // 检测下epoll的file对象是否为epoll文件

  goto eexit_3;

 ep = file->private_data; // 获取epoll文件对象的epoll核心结构体:eventpoll

 down_write(&ep->sem); // 获取写信号量

 epi = ep_find(ep, tfile, fd); // 先在epoll对象的监听文件中看看是否存在当前需要操作的fd的epitem结构(由于我们这里使用链表来管理epitem,所以我们这里通过遍历链表对比epitem的tfile和fd属性)

 error = -EINVAL;

 switch (op) {

  case EPOLL_CTL_ADD: // 添加操作

   if (!epi) {

    epds.events |= POLLERR | POLLHUP; // 自动添加ERR和HUP事件

    error = ep_insert(ep, &epds, tfile, fd); // 将监听文件对象插入到epoll对象中监听

   } else

    error = -EEXIST; // 只能添加一次

   break;

  case EPOLL_CTL_DEL: // 删除操作

   if (epi) // 如果epitem存在,那么执行移除

    error = ep_remove(ep, epi);

   else

    error = -ENOENT;

   break;

  case EPOLL_CTL_MOD: // 修改操作

   if (epi) {

    epds.events |= POLLERR | POLLHUP; // 自动添加ERR和HUP事件

    error = ep_modify(ep, epi, &epds);

   } else

    error = -ENOENT;

   break;

 }

 // 由于ep_find函数中,如果epi存在,那么增加了epitem的计数,那么现在需要释放这个计数,其实就是对usecnt变量原子性减1

 if (epi)

  ep_release_epitem(epi);

 up_write(&ep->sem); // 释放写信号量

 eexit_3:

 fput(tfile);

 eexit_2:

 fput(file);

 eexit_1:

 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n",

    current, epfd, op, fd, event, error));

 return error;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125

ep_insert函数原理

该函数用于将监听的tfile文件对象放入epitem中,然后将其放入对应的链表中。我们看到首先初始化epi中的链表结构,接着初始化epitem结构体变量,然后初始化poll table的回调函数为ep_ptable_queue_proc,我们稍后就会详细介绍该回调函数的作用,以及其调用的原理。最后添加epitem到epoll的监听链表中,检测是否当前文件对象在添加到队列后,马上就发生了感兴趣事件,如果是这样,那么将其放入到epollevent的rdllink准备链表的末尾,并唤醒等待进程。详细实现如下。

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,

     struct file *tfile, int fd){

 int error, revents, pwake = 0;

 unsigned long flags;

 struct epitem *epi;

 struct ep_pqueue epq;

 error = -ENOMEM;

 if (!(epi = EPI_MEM_ALLOC())) // 分配一个新的epitem结构

  goto eexit_1;

 // 初始化epi中的链表结构

 INIT_LIST_HEAD(&epi->llink);

 INIT_LIST_HEAD(&epi->rdllink);

 INIT_LIST_HEAD(&epi->fllink);

 INIT_LIST_HEAD(&epi->txlink);

 INIT_LIST_HEAD(&epi->pwqlist);

 // 初始化epitem结构体变量

 epi->ep = ep;

 epi->file = tfile;

 epi->fd = fd;

 epi->event = *event;

 atomic_set(&epi->usecnt, 1); // 设置引用计数为1

 epi->nwait = 0;

 // 初始化poll table的回调函数为ep_ptable_queue_proc

 epq.epi = epi;

 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

 // 执行目标文件的poll函数,将会回调ep_ptable_queue_proc函数

 revents = tfile->f_op->poll(tfile, &epq.pt);

 if (epi->nwait < 0)

  goto eexit_2;

 // 添加epitem到目标文件对象的epoll hook 链表

 spin_lock(&tfile->f_ep_lock);

 list_add_tail(&epi->fllink, &tfile->f_ep_links);

 spin_unlock(&tfile->f_ep_lock);

 write_lock_irqsave(&ep->lock, flags);

 // 添加epitem到epoll的监听链表中(注意我们这里使用的是基于hash表的链表,也即找到索引下标,然后链到不同的下标中,可以参考下hashmap的链地址法)

 list_add(&epi->llink, ep_hash_entry(ep, ep_hash_index(ep, tfile, fd)));

 // 如果此时文件已经准备好了且当前epi没有被放入到epollevent的rdllink准备链表的末尾

 if ((revents & event->events) && !EP_IS_LINKED(&epi->rdllink)) {

  list_add_tail(&epi->rdllink, &ep->rdllist);

  // 如果有进程正在阻塞,那么唤醒进程处理该准备好的事件

  if (waitqueue_active(&ep->wq))

   wake_up(&ep->wq);

  if (waitqueue_active(&ep->poll_wait)) // 唤醒通过poll函数阻塞进程

   pwake++;

 }

 write_unlock_irqrestore(&ep->lock, flags);

 if (pwake)

  ep_poll_safewake(&psw, &ep->poll_wait);

 return 0;

 eexit_2:

 ep_unregister_pollwait(ep, epi);

 write_lock_irqsave(&ep->lock, flags);

 if (EP_IS_LINKED(&epi->rdllink))

  EP_LIST_DEL(&epi->rdllink);

 write_unlock_irqrestore(&ep->lock, flags);

 EPI_MEM_FREE(epi);

 eexit_1:

 return error;

}

接下来我们来看ep_pqueue结构体原理。我们看到ep_pqueue结构仅仅作为poll_table和epitem结构体的包装,而poll_table_struct结构体只是一个回调函数poll_queue_proc,我们可以看到该函数接收一个文件对象和wait_queue_head_t等待队列指针,poll_table_struct结构体,也即poll_table结构体。

struct ep_pqueue {

 poll_table pt;

 struct epitem *epi;

};

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

typedef struct poll_table_struct {

 poll_queue_proc qproc;

} poll_table;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

接下来我们来看init_poll_funcptr(&epq.pt, ep_ptable_queue_proc)的原理。我们看到该函数仅仅是把poll_table的函数指针指向qproc。

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc){

 pt->qproc = qproc;

}
1
2
3
4
5

那么来看看传入的ep_ptable_queue_proc函数的实现。我们看到首先通过poll_table地址取出epitem(读者可以想想如何获取?我们在前面看到ep_pqueue结构中包含poll_table,而这个地址的下面便是epitem,我们可以怎么做?用poll_table的地址转为ep_pqueue结构体指针,然后直接取epitem即可),然后分配一个eppoll_entry结构体,然后调用init_waitqueue_func_entry初始化eppoll_entry的变量和回到函数为ep_poll_callback为ep_poll_callback,这时相当于把eppoll_entry的wait_queue_t的变量的回调函数设置为ep_poll_callback函数入口,同时将将eppoll_entry的wait_queue_t wait 节点添加到wait_queue_head_t whead的链表中(该等待节点我们已经设置了回调函数为ep_poll_callback,当数据可用时将会回调该函数),并且将eppoll_entry添加到epitem的pwqlist链表中,此时读者是否发现:我们既可以从操作的fd的等待链接表中回调wait中的函数,又可以从epitem节点中通过遍历pwqlist链表来获取到等待信息结构eppoll_entry?所以我们可以稍微抽一下这些共同点:将结构通过list_head结构和其变种wait_queue_head_t结构关联在一起,为何关联?查询操作嘛。详细实现如下所示。

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt){

 struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt); // 根据poll_table地址取出epitem

 struct eppoll_entry *pwq;

 if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) { // 分配eppoll_entry结构体

  init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // 初始化eppoll_entry的变量和回到函数为ep_poll_callback

  pwq->whead = whead; 

  pwq->base = epi;

  add_wait_queue(whead, &pwq->wait); // 将eppoll_entry中的wait_queue_t添加到wait_queue_head_t等待节点中

  list_add_tail(&pwq->llink, &epi->pwqlist); // 将eppoll_entry添加到epitem的pwqlist链表中

  epi->nwait++; // 增加等待计数

 } else {

  epi->nwait = -1;

 }

}



struct eppoll_entry {

 struct list_head llink; // 用于将该结构连接到struct epitem的pwqlist链表节点

 void *base; // 指向关联的epitem结构指针

 wait_queue_t wait; // 用于将该结构添加到目标文件的等待链表节点

 wait_queue_head_t *whead; // 用于保存当前eppoll_entry添加到目标文件的链表节点指针

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

我们看到这个回调函数就是初始化一个eppoll_entry设置回调函数为ep_poll_callback然后将其添加到了目标文件对象的等待节点中,那么我们如何知道这个链表节点是谁呢?是否还有一个疑惑:是谁来回调这个ep_ptable_queue_proc函数呢?我们在上面看到最后会调用tfile->f_op->poll(tfile, &epq.pt)的poll函数,那么我们这里以网络函数来看看poll函数做了什么?我们看到对于网络Socket文件来说,当我们将其插入到Epoll后,将会通过上面的poll函数调用sock_poll,该函数进一步调用tcp_poll(因为我们假定使用TCP协议),而在tcp_poll函数中调用了poll_wait函数,该函数我们看到就是回调ep_ptable_queue_proc。详细描述如下。

static unsigned int sock_poll(struct file *file, poll_table * wait) { struct socket *sock; sock = SOCKET_I(file->f_dentry->d_inode); return sock->ops->poll(file, sock, wait); // 通过预设的函数指针调用tcp_poll } unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait) { unsigned int mask; struct sock *sk = sock->sk; struct tcp_opt *tp = tcp_sk(sk); poll_wait(file, sk->sk_sleep, wait); // 回调poll_table *wait 的 qproc 函数 ... } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && wait_address) p->qproc(filp, wait_address, p); }
1

作者一直说:talk is cheap ,show me the code。给出任何结论,作者都会给出论据。所以我们还有一个论据需要给出。我们说过在Socket fd中我们调用了tcp_poll函数,将eppoll_entry结构中的wait_queue_t wait结构添加到了sk->sk_sleep链表中,那么我们说会被回调其中的func函数,那么是怎样被回调呢?我们看以下代码。sock_init_data函数用于初始化sock结构,可能读者不清楚Linux的网络模块,会问:为什么会有一个socket和sock结构,这里稍微提一下,在Linux中兼容对BSD 对网络的规范,而这个socket结构就是general BSD socket结构,而我们知道,兼容是兼容,但毕竟我Linux可以有自己的socket,而这个sock就是Linux实现网络模块的结构。我们看到这里设置了多个回调函数,当sock发生对应事件时回调这些函数。我们这里以sk_state_change事件来举例,我们看到当sock的状态改变后,在sock_def_wakeup方法中调用wake_up_interruptible_all,该函数名为唤醒所有的进程,但是由于我们在Epoll添加过程中不存在进程的阻塞,所以设置了wait_queue_t的func为ep_poll_callback函数,所以最后在__wake_up_common链表遍历中调用了该回调函数。当然,sock其他的回调函数也是如此。详细描述如下。

void sock_init_data(struct socket *sock, struct sock *sk){

 ...

 sk->sk_state_change = sock_def_wakeup; // sock状态改变后回调

 sk->sk_data_ready = sock_def_readable; // sock数据可用时回调 

 sk->sk_write_space = sock_def_write_space; // sock写数据的空间可用时回调

 sk->sk_error_report = sock_def_error_report; // sock发生错误时回调

 ...

}



// sock状态改变后回调

void sock_def_wakeup(struct sock *sk){

 read_lock(&sk->sk_callback_lock); // 获取读锁

 if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) // 等待队列不为空,那么调用wake_up_interruptible_all唤醒sk_sleep链表中的等待节点

  wake_up_interruptible_all(sk->sk_sleep);

 read_unlock(&sk->sk_callback_lock);

}



\#define wake_up_interruptible_all(x) __wake_up((x),TASK_INTERRUPTIBLE, 0)// 这里的0表示非互斥唤醒,也即唤醒全部



void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive){

 unsigned long flags;

 spin_lock_irqsave(&q->lock, flags);

 __wake_up_common(q, mode, nr_exclusive, 0); // 调用该函数完成唤醒过程

 spin_unlock_irqrestore(&q->lock, flags);

}



static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int sync){

 struct list_head *tmp, *next;

 list_for_each_safe(tmp, next, &q->task_list) { // 遍历等待链表wait_queue_head_t *q

  wait_queue_t *curr;

  unsigned flags;

  curr = list_entry(tmp, wait_queue_t, task_list); // 从当前task_list地址中获取到wait_queue_t结构

  flags = curr->flags;

  if (curr->func(curr, mode, sync) && // 调用其设置的回调函数,这里如果是epoll,那么就是回调了ep_poll_callback函数

   (flags & WQ_FLAG_EXCLUSIVE) && // 如果指定了互斥唤醒,那么使用nr_exclusive来决定唤醒多少个进程(读者一定要注意这里的标志位,因为我们在ep_poll函数中添加等待进程时,可没有指定这个标志位哟,考虑下惊群?)

   !--nr_exclusive)

   break;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

接下来我们就可以直接来看ep_poll_callback回调函数的原理了。

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync){

 int pwake = 0;

 unsigned long flags;

 struct epitem *epi = EP_ITEM_FROM_WAIT(wait); // 从wait节点中获取到epitem,我们知道 eppoll_entry 中包含了该wait_queue_t wait结构,那么我们便可以根据wait的地址-wait相对于eppoll_entry结构的偏移量即可

 struct eventpoll *ep = epi->ep; // 从epitem中获取到关联的eventpoll结构

 write_lock_irqsave(&ep->lock, flags);

 // 如果该文件代表的epitem已经存在于eventpoll中,那么我们直接退出,因为我们前面直接调用了poll函数,考虑下如果数据在调用poll的时候就已经可用了呢?这时是不是在前面就将其添加到了eventpoll的rdllink准备好事件的链表中?

 if (EP_IS_LINKED(&epi->rdllink))

  goto is_linked;

 list_add_tail(&epi->rdllink, &ep->rdllist); // 添加到eventpoll的rdllink准备好事件的链表

 is_linked:

 // 唤醒所有等待eventpoll准备好事件的进程,这里就是唤醒了通过epoll_wait调用而阻塞的进程(注意,这里导致了惊群效应的发生,由于我们没有使用互斥唤醒标志WQ_FLAG_EXCLUSIVE,所以这里的wake_up函数将会唤醒所有在wq等待链表上的进程)

 if (waitqueue_active(&ep->wq))

  wake_up(&ep->wq);

 if (waitqueue_active(&ep->poll_wait))

  pwake++;

 write_unlock_irqrestore(&ep->lock, flags);

 if (pwake)

  ep_poll_safewake(&psw, &ep->poll_wait);

 return 1;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

ep_remove原理

该函数用于从epoll监听fd链表中移除对应的fd。其实我们看到只不过是insert的逆向过程罢了,我们做了哪些链接,那么就需要将其从对应的链表中摘除,然后将其从epoll结构中移除,最后释放epitem所占用的内存空间。详细描述如下。

static int ep_remove(struct eventpoll *ep, struct epitem *epi){

 int error;

 unsigned long flags;

 struct file *file = epi->file;

 ep_unregister_pollwait(ep, epi); // 移除所有epitem结构的pwqlist链表中,eppoll_entry结构添加到的wait等待节点,因为我们移除这个fd后不需要再监听回调了

 spin_lock(&file->f_ep_lock);

 if (EP_IS_LINKED(&epi->fllink)) // 随后将epitem结构从监听文件对象的链表中移除

  EP_LIST_DEL(&epi->fllink);

 spin_unlock(&file->f_ep_lock);

 write_lock_irqsave(&ep->lock, flags);

 error = ep_unlink(ep, epi); // 接着将epitem结构从epoll结构中移除

 write_unlock_irqrestore(&ep->lock, flags);

 if (error)

  goto eexit_1;

 ep_release_epitem(epi); // 释放epitem所占用的内存空间

 error = 0;

 eexit_1:

 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_remove(%p, %p) = %d\n",

    current, ep, file, error));

 return error;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

ep_modify原理

该函数用于修改监听文件对象的感兴趣事件集。我们看到首先修改了感兴趣事件集合,然后调用修改fd的poll函数,该函数将会查看一下当前是否有准备好的事件集,然后我们进一步判断当前epitem是否被移除,如果没有那么根据当前事件集和epitem的状态来选择将其从eventpoll准备好的事件链表中放入还是移除。详细描述如下。

static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event){

 int pwake = 0;

 unsigned int revents;

 unsigned long flags;

 epi->event.events = event->events; // 更新感兴趣事件集

 revents = epi->file->f_op->poll(epi->file, NULL); // 回调一下修改fd的poll函数,注意此时传递的poll_table *wait 为NULL,所以此时并没有修改我们之前设置的回调函数,只是看看是否有事件发生

 write_lock_irqsave(&ep->lock, flags);

 epi->event.data = event->data; // 更新用户数据结构

 // 如果此时当前修改文件的epitem已经被删除,那么我们这里什么也做

 if (EP_IS_LINKED(&epi->llink)) {

  // 否则我们看看是否当前有准备好的事件,revents为我们通过修改文件的poll回调获取的事件集合

  if (revents & event->events) {

   if (!EP_IS_LINKED(&epi->rdllink)) {

    // 将当前epitem添加到epoll的准备好事件链表

    list_add_tail(&epi->rdllink, &ep->rdllist);

    // 唤醒等待进程

    if (waitqueue_active(&ep->wq))

     wake_up(&ep->wq);

    if (waitqueue_active(&ep->poll_wait))

     pwake++;

   }

  } else if (EP_IS_LINKED(&epi->rdllink)) // 如果当前epitem已经链接到了准备好事件中,但是由于我们已经修改了感兴趣事件集,那么需要将其从准备好的事件链表中移除

   EP_LIST_DEL(&epi->rdllink);

 }

 write_unlock_irqrestore(&ep->lock, flags);

 if (pwake)

  ep_poll_safewake(&psw, &ep->poll_wait);

 return 0;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

sys_epoll_wait原理

该函数用于实现对Epoll对象进行查询的操作。我们从epoll_wait函数中知道,struct epoll_event __user *events用于存放准备好的事件集,int maxevents用于指定大小,通常等于events的数组大小,同时 int timeout用于指明超时等待时间。我们说linux内核不相信任何用户空间的数据,所以对这些参数进行了详细校验。然后获取到eventpoll对象调用ep_poll函数完成事件获取。

在ep_poll函数中,我们看到首先计算超时时间,然后检测下准备好事件链表是否为空,若为空,那么创建进程等待节点放入到eventpoll对象的wq链表中,同时设置进程状态为TASK_INTERRUPTIBLE,然后检测下当前进程没有需要处理的信号,那么调用schedule_timeout使用进程调度器来选择其他进程执行(可能读者不熟悉Linux内核,是这样的,内核不同于用户空间,我们需要手动将控制权交由进程管理模块来调度,而这里的schedule_timeout函数便是进程调度模块的核心函数,该函数将使用我们熟知的进程调度器,使用调度算法选取下一个进程完成进程切换,后面的文章作者会详细的描述该切换过程)。由此我们可以得出结论,该等待进程的返回情况有三种:由ep_poll_callback回调函数唤醒、进程传递信号唤醒、超时时间到唤醒。如果有准备好的事件发生后,我们会调用ep_events_transfer函数构建txlist并将其复制到用户指定的内存空间中。

在ep_events_transfer函数中,我们看到首先调用ep_collect_ready_items函数将rdllist中的准备好的事件取出来放入txlist中,获取的数量由maxevents指定,然后我们调用ep_send_events将txlist复制到用户指定的 struct epoll_event __user *events空间中,该函数作者没有展开,因为没有必要,这里又会涉及到Linux的内存管理原理,比如copy_to_user函数的使用方式,这已经不属于Epoll的范畴,所以读者也不需要在本篇文章了解。最后调用了ep_reinject_items函数来看看是否需要将txlist中的事件重新放入到epoll的rdllist中。

在ep_reinject_items函数中,我们看到 EP_IS_LINKED(&epi->llink) && !(epi->event.events & EPOLLET) 核心判断逻辑,该判断看看如果epitem没有被删除且设置的事件处理类型不为EPOLLET(边缘触发),那么就会将其重新放入到rdllist中,最后判断如果重放入了,那么唤醒所有等待进程。那么问题就来了:很多人把这一步当做是惊群效应,但其实并不是。这正是Epoll的高性能的体现,大伙想想:如果返回的fd数量较大,而我们使用了水平触发,一个进程处理不过来,我是否可以多个进程来唤醒处理,然后由处理函数保证互斥即可,这何尝不是高性能的体现?而我们所需要注意的惊群在这个版本中确实有,存在于我们之前介绍的ep_poll_callback回调函数和ep_poll函数中,由于在添加进程等待节点时并没有指定WQ_FLAG_EXCLUSIVE标志位,从而在前面我们介绍的__wake_up_common函数中将不会响应nr_exclusive变量,尽管我们的wake_up函数的nr_exclusive为1,但是由于没有设置标志位,所以并不响应,前面详细描述过啦。函数详细实现如下所示。

long sys_epoll_wait(int epfd, struct epoll_event __user *events,int maxevents, int timeout){

 int error;

 struct file *file;

 struct eventpoll *ep;

 // 最大获取事件必须大于0

 if (maxevents <= 0)

  return -EINVAL;

 // 验证传入的存放准备好事件的内存区域合法性

 if ((error = verify_area(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))))

  goto eexit_1;

 error = -EBADF;

 file = fget(epfd); // 根据epoll fd获取到epoll文件对象

 if (!file)

  goto eexit_1;

 error = -EINVAL;

 if (!IS_FILE_EPOLL(file)) // 检查当前epoll file对象是否为epoll文件

  goto eexit_2;

 ep = file->private_data; // 获取到epoll文件对象的实际承载体:eventpoll结构

 error = ep_poll(ep, events, maxevents, timeout); // 调用该函数获取事件

 eexit_2:

 fput(file);

 eexit_1:

 DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) = %d\n",

    current, epfd, events, maxevents, timeout, error));

 return error;

}



// 获取准备好的事件集,如果指定了timeout,那么等待超时后返回

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, long timeout){

 int res, eavail;

 unsigned long flags;

 long jtimeout;

 wait_queue_t wait;

 // 计算超时时间。如果timeout为-1,那么超时时间戳为MAX_SCHEDULE_TIMEOUT 该值为long的最大值,否则我们根据内核的时钟频率HZ来计算,当前内核的频率为100HZ

 jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?

  MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;



 retry:

 write_lock_irqsave(&ep->lock, flags);

 res = 0;

 if (list_empty(&ep->rdllist)) { // 如果准备好的事件链表为空那么构建当前进程的等待节点结构wait_queue_t wait,并将其添加到eventpoll的wq等待链表

  init_waitqueue_entry(&wait, current);

  add_wait_queue(&ep->wq, &wait); // 注意:由于这里添加节点时没有指定互斥状态WQ_FLAG_EXCLUSIVE,从而在callback唤醒时会唤醒所有等待进程

  // 循环等待直到满足条件

  for (;;) {

   set_current_state(TASK_INTERRUPTIBLE); // 设置当前进程状态为TASK_INTERRUPTIBLE(可中断阻塞状态)

   if (!list_empty(&ep->rdllist) || !jtimeout) // 如果准备好事件集此时不为空,或者超时,那么退出循环

    break;

   if (signal_pending(current)) { // 当前进程有需要处理的信号,那么退出并设置返回值为EINTR

    res = -EINTR;

    break;

   }

   write_unlock_irqrestore(&ep->lock, flags);

   jtimeout = schedule_timeout(jtimeout); // 调用该函数让当前进程阻塞,并且将调用Linux内核的进程调度模块来选择其他进程运行

   write_lock_irqsave(&ep->lock, flags);

  }

  // 满足条件后将当前进程从eventpoll的wq等待链表中移除,并设置TASK_RUNNING的标志位表示当前进程处于运行状态

  remove_wait_queue(&ep->wq, &wait);

  set_current_state(TASK_RUNNING);

 }

 eavail = !list_empty(&ep->rdllist);

 write_unlock_irqrestore(&ep->lock, flags);

 // 如果当前epoll中存在准备好的事件集,那么我们调用ep_events_transfer函数将其复制到用户指定的events内存中,否则我们返回retry处继续尝试

 if (!res && eavail && // 如果指定了res那么直接返回res

  !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)

  goto retry;



 return res;

}



// 将准备好的事件传输到用户指定的内存空间events中

static int ep_events_transfer(struct eventpoll *ep,struct epoll_event __user *events, int maxevents){

 int eventcnt = 0;

 struct list_head txlist;

 INIT_LIST_HEAD(&txlist); // 初始化传输节点

 down_read(&ep->sem); 

 // 将rdllist准备好事件集的链表节点转移到txlist链表中,最大个数为maxevents

 if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {

  eventcnt = ep_send_events(ep, &txlist, events); // 将txlist传输到用户空间中

  ep_reinject_items(ep, &txlist); // 根据Epoll指定的LT和ET模式来选择是否重新将txlist放入到rdllist中

 }

 up_read(&ep->sem);

 return eventcnt;

}



// 根据Epoll指定的LT和ET模式来选择是否重新将txlist放入到rdllist中

static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist){

 int ricnt = 0, pwake = 0;

 unsigned long flags;

 struct epitem *epi;

 write_lock_irqsave(&ep->lock, flags);

 while (!list_empty(txlist)) { // 循环直到txlist链表为空

  epi = list_entry(txlist->next, struct epitem, txlink); // 根据txlist的地址获取到所属epitem结构

  EP_LIST_DEL(&epi->txlink);

  if (EP_IS_LINKED(&epi->llink) && !(epi->event.events & EPOLLET) && // epitem没有被删除且设置的事件处理类型不为EPOLLET(边缘触发)

   (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {

   // 那么将tx中的epitem重新放入rdllist准备好事件集链表中

   list_add_tail(&epi->rdllink, &ep->rdllist);

   ricnt++; // 计算重放入次数

  }

 }

 // 如果已经重放入准备好事件集,那么唤醒所有等待节点

 if (ricnt) {

  if (waitqueue_active(&ep->wq))

   wake_up(&ep->wq);

  if (waitqueue_active(&ep->poll_wait))

   pwake++;

 }

 write_unlock_irqrestore(&ep->lock, flags);

 if (pwake)

  ep_poll_safewake(&psw, &ep->poll_wait);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223