# Netty 核心原理十一 SingleThreadEventLoop 原理
OioEventLoopGroup 原理
该类非常简单,实际没有完成任何操作,仅仅只是在构造函数中初始化父类。主要用于在命名上符合Netty对于事件循环组的定义规范。比如:NIO的为:NioEventLoopGroup、BIO:OioEventLoopGroup。源码描述如下。
public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {
public OioEventLoopGroup() {
this(0);
}
public OioEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
public OioEventLoopGroup(int maxChannels, Executor executor) {
super(maxChannels, executor);
}
public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) {
super(maxChannels, threadFactory);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
SingleThreadEventLoop 原理
该类表示一个单线程执行的事件循环对象,同样大部分操作已经在SingleThreadEventExecutor中实现。这里主要添加了自己的Queue<Runnable> tailTasks
任务队列的处理,同时实现了EventLoop中register函数的实现。
核心变量与构造器
从核心变量和构造器中我们看到,该类主要新增的操作为Queue<Runnable> tailTasks
队列操作。源码描述如下。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); // tailTasks队列中最大放置的任务长度
private final Queue<Runnable> tailTasks; // 放置tail任务的队列。也即在taskQueue普通任务队列执行后,需要执行的任务
// 构造方法:设置了默认为DEFAULT_MAX_PENDING_TASKS和拒绝函数
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
// 构造方法:初始化父类同时创建tailTasks队列
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
核心方法next原理
该方法直接调用父类的next方法来获取下一个EventLoop对象。源码描述如下。
public EventLoop next() {
return (EventLoop) super.next();
}
2
3
4
5
核心方法executeAfterEventLoopIteration原理
该方法用于添加任务到tailTasks队列中。流程如下:
- 检查EventLoop是否已经关闭,如果关闭,那么抛出任务拒绝异常
- 尝试将任务放入tailTasks中,如果达到最大长度,导致放入失败,那么抛出任务拒绝异常
- 根据设置唤醒工作任务线程
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (isShutdown()) { // 检查EventLoop是否已经关闭
reject();
}
if (!tailTasks.offer(task)) { // 尝试将任务放入tailTasks中
reject(task);
}
if (wakesUpForTask(task)) { // 根据设置唤醒工作任务线程
wakeup(inEventLoop());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
核心方法afterRunningAllTasks原理
在SingleThreadEventExecutor类的runAllTasks方法中,我们看到该方法将作为钩子函数来执行,当所有taskQueue中的任务都执行完毕后,会回调该钩子函数。我们看到这里重写了该方法,将执行tailTasks队列中的任务。源码描述如下。
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
2
3
4
5
核心方法register原理
register方法用于将Channel注册到当前EventLoop中。我们看到这里将封装ChannelPromise对象后,调用Channel的unsafe对象的register方法来完成注册。源码描述如下。
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise); // 注册通道
return promise;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
核心方法hasTasks原理
该方法用于判断是否有任务需要执行。注意这里需要判断父类的taskQueue和当前tailTasks。源码描述如下。
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
2
3
4
5
6
7
核心方法pendingTasks原理
该方法用于获取当前需要执行的任务个数。可以看到任务个数等于:父类taskQueue的任务数+tailTasks的任务数。源码描述如下。
@Override
public int pendingTasks() {
return super.pendingTasks() + tailTasks.size();
}
2
3
4
5
6
7