# Netty 核心原理十一 SingleThreadEventLoop 原理

OioEventLoopGroup 原理

该类非常简单,实际没有完成任何操作,仅仅只是在构造函数中初始化父类。主要用于在命名上符合Netty对于事件循环组的定义规范。比如:NIO的为:NioEventLoopGroup、BIO:OioEventLoopGroup。源码描述如下。

public class OioEventLoopGroup extends ThreadPerChannelEventLoopGroup {

 

 public OioEventLoopGroup() {

  this(0);

 }



 public OioEventLoopGroup(int maxChannels) {

  this(maxChannels, Executors.defaultThreadFactory());

 }



 public OioEventLoopGroup(int maxChannels, Executor executor) {

  super(maxChannels, executor);

 }



 public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) {

  super(maxChannels, threadFactory);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

SingleThreadEventLoop 原理

该类表示一个单线程执行的事件循环对象,同样大部分操作已经在SingleThreadEventExecutor中实现。这里主要添加了自己的Queue<Runnable> tailTasks任务队列的处理,同时实现了EventLoop中register函数的实现。

核心变量与构造器

从核心变量和构造器中我们看到,该类主要新增的操作为Queue<Runnable> tailTasks队列操作。源码描述如下。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {



 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); // tailTasks队列中最大放置的任务长度

 

 private final Queue<Runnable> tailTasks; // 放置tail任务的队列。也即在taskQueue普通任务队列执行后,需要执行的任务



 // 构造方法:设置了默认为DEFAULT_MAX_PENDING_TASKS和拒绝函数

 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

  this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());

 }



 // 构造方法:初始化父类同时创建tailTasks队列

 protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,

         boolean addTaskWakesUp, int maxPendingTasks,

         RejectedExecutionHandler rejectedExecutionHandler) {

  super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

  tailTasks = newTaskQueue(maxPendingTasks);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

核心方法next原理

该方法直接调用父类的next方法来获取下一个EventLoop对象。源码描述如下。

public EventLoop next() {

 return (EventLoop) super.next();

}
1
2
3
4
5

核心方法executeAfterEventLoopIteration原理

该方法用于添加任务到tailTasks队列中。流程如下:

  1. 检查EventLoop是否已经关闭,如果关闭,那么抛出任务拒绝异常
  2. 尝试将任务放入tailTasks中,如果达到最大长度,导致放入失败,那么抛出任务拒绝异常
  3. 根据设置唤醒工作任务线程
public final void executeAfterEventLoopIteration(Runnable task) {

 ObjectUtil.checkNotNull(task, "task");

 if (isShutdown()) { // 检查EventLoop是否已经关闭

  reject();

 }



 if (!tailTasks.offer(task)) { // 尝试将任务放入tailTasks中

  reject(task);

 }



 if (wakesUpForTask(task)) { // 根据设置唤醒工作任务线程

  wakeup(inEventLoop());

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

核心方法afterRunningAllTasks原理

在SingleThreadEventExecutor类的runAllTasks方法中,我们看到该方法将作为钩子函数来执行,当所有taskQueue中的任务都执行完毕后,会回调该钩子函数。我们看到这里重写了该方法,将执行tailTasks队列中的任务。源码描述如下。

protected void afterRunningAllTasks() {

 runAllTasksFrom(tailTasks);

}
1
2
3
4
5

核心方法register原理

register方法用于将Channel注册到当前EventLoop中。我们看到这里将封装ChannelPromise对象后,调用Channel的unsafe对象的register方法来完成注册。源码描述如下。

public ChannelFuture register(Channel channel) {

 return register(new DefaultChannelPromise(channel, this));

}



@Override

public ChannelFuture register(final ChannelPromise promise) {

 ObjectUtil.checkNotNull(promise, "promise");

 promise.channel().unsafe().register(this, promise); // 注册通道

 return promise;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

核心方法hasTasks原理

该方法用于判断是否有任务需要执行。注意这里需要判断父类的taskQueue和当前tailTasks。源码描述如下。

@Override

protected boolean hasTasks() {

 return super.hasTasks() || !tailTasks.isEmpty();

}
1
2
3
4
5
6
7

核心方法pendingTasks原理

该方法用于获取当前需要执行的任务个数。可以看到任务个数等于:父类taskQueue的任务数+tailTasks的任务数。源码描述如下。

@Override

public int pendingTasks() {

 return super.pendingTasks() + tailTasks.size();

}
1
2
3
4
5
6
7