# Netty 核心原理九 NioEventLoopGroup与EventLoopGroup 原理

在SingleThreadEventExecutor中我们看到,当我们执行execute方法时会将任务放入到普通任务队列中,同时调用doStartThread方法启动执行线程,而执行线程将会调用子类实现的run方法,完成执行器任务的执行。那么很容易想到,该类作为默认执行器,那么将会重写run方法完成自身的逻辑,而其他所有的操作,都已经在SingleThreadEventExecutor中完成。所以我们这里直接看run方法和构造函数实现即可。

public final class DefaultEventExecutor extends SingleThreadEventExecutor { 

 // 构造器:通过指定执行器初始化父类

 public DefaultEventExecutor(Executor executor) {

  this(null, executor);

 }



 // 构造器:通过指定事件执行器组初始化父类

 public DefaultEventExecutor(EventExecutorGroup parent) {

  this(parent, new DefaultThreadFactory(DefaultEventExecutor.class));

 }



 // 构造器:通过指定事件执行器组和线程工厂初始化父类

 public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {

  super(parent, threadFactory, true);

 }

 

 // 重写父类的run方法,在该方法中不断循环执行任务队列中的任务

 protected void run() {

  for (;;) {

   // 从队列中获取任务执行,并且更新最后任务执行时间

   Runnable task = takeTask();

   if (task != null) {

    task.run();

    updateLastExecutionTime(); 

   }

   if (confirmShutdown()) { // 确认是否需要关闭事件执行器,如果关闭,那么直接退出循环结束任务执行

    break;

   }

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

EventLoopGroup 原理

EventLoopGroup 是一个特殊的 EventExecutorGroup 事件执行器组,这里笔者将其翻译为事件循环组。它在EventExecutorGroup的基础上增加了对Channel通道的注册操作。源码描述如下。

public interface EventLoopGroup extends EventExecutorGroup {

 /**

 * 返回事件循环组中的下一个事件循环对象

 */

 @Override

 EventLoop next();



 /**

 * 将channel注册到事件循环组中,返回的ChannelFuture表明执行结果

 */

 ChannelFuture register(Channel channel);



 /**

 * 使用ChannelPromise来注册通道,这里读者只需要知道是将channel通道包装在ChannelPromise中即可

 */

 ChannelFuture register(ChannelPromise promise);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

EventLoop 原理

EventLoop 表示 EventLoopGroup 中的事件循环,我们看到该接口是一个特殊的EventLoopGroup,很明显,又是一个只有自己的组。所以该组将处理所有注册到其中的channel操作,同时由于其包含在EventLoopGroup中,所以增加了一个用于获取所属事件循环组的parent方法。源码描述如下。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {

 @Override

 EventLoopGroup parent();

}
1
2
3
4
5
6
7

MultithreadEventLoopGroup 原理

MultithreadEventLoopGroup表示一个多线程处理的事件循环组,实现相当简单,因为大部分操作都在MultithreadEventExecutorGroup抽象类中实现了,这里只不过对register方法进行了默认实现。可以看到,所有操作均是调用next方法来获取事件循环组中的下一个EventLoop事件循环来处理。所以更加能表现了MultithreadEventLoopGroup仅仅只是管理了EventLoop,真正干活的是EventLoop。源码描述如下。

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

 private static final int DEFAULT_EVENT_LOOP_THREADS; // 默认事件循环线程个数

 static {

  // 默认取 CPU核心数 * 2

  DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(

   "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

 }



 // 构造器:如果线程数为0,那么将使用默认线程数

 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {

  super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

 }



 // 默认线程工厂,指定优先级为最高优先级

 @Override

 protected ThreadFactory newDefaultThreadFactory() {

  return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);

 }



 // 直接调用父类的Chooser来完成下一个事件循环对象的选择

 @Override

 public EventLoop next() {

  return (EventLoop) super.next();

 }



 // 由子类完成对事件循环对象的创建

 @Override

 protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;



 

 /****************所有操作均由事件循环对象来完成****************/

 @Override

 public ChannelFuture register(Channel channel) {

  return next().register(channel);

 }



 @Override

 public ChannelFuture register(ChannelPromise promise) {

  return next().register(promise);

 }



 @Deprecated

 @Override

 public ChannelFuture register(Channel channel, ChannelPromise promise) {

  return next().register(channel, promise);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

DefaultEventLoopGroup 原理

DefaultEventLoopGroup为默认的事件循环组实现,同样较为简单:既然是默认事件循环组,那么真实处理的肯定也是默认的事件循环对象,DefaultEventLoop。源码描述如下。

public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {



 /**

 * 使用默认的线程数

 */

 public DefaultEventLoopGroup() {

  this(0);

 }



 /**

 * 指定线程数创建

 */

 public DefaultEventLoopGroup(int nThreads) {

  this(nThreads, null);

 }



 /**

 * 指定线程数和线程工厂

 */

 public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {

  super(nThreads, threadFactory);

 }



 // 事件循环对象为默认事件循环

 @Override

 protected EventLoop newChild(Executor executor, Object... args) throws Exception {

  return new DefaultEventLoop(this, executor);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

NioEventLoopGroup 原理

NioEventLoopGroup用于处理NIO Channel操作的事件循环组。我们看到这里创建的子事件循环为NioEventLoop,同理这里由于是操作NIO的事件,所以需要初始化SelectorProvider来创建Selector实例。该类也是在生产环境中常用的事件循环组,但是实现也较为简单,因为大量的操作也是在MultithreadEventLoopGroup中完成。源码描述如下。

public class NioEventLoopGroup extends MultithreadEventLoopGroup {



 /**

 * 构造器:初始化SelectorProvider,该对象将初始化Selector选择器

 */

 public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {

  this(nThreads, threadFactory, SelectorProvider.provider());

 }



 

 /**

 * 构造器:使用默认的选择策略(作者:黄俊,微信:bx_java)DefaultSelectStrategyFactory.INSTANCE,该选择策略将会在EventLoop事件循环对象中处理Selector通道选择时使用

 */

 public NioEventLoopGroup(

  int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {

  this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);

 }



 // 使用默认拒绝策略初始化父类

 public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,

  final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {

  super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());

 }

 /**

 * 设置事件循环处理线程,在处理Channel IO的时间占比和执行普通任务的时间占比,默认为 50%

 */

 public void setIoRatio(int ioRatio) {

  for (EventExecutor e: this) {

   ((NioEventLoop) e).setIoRatio(ioRatio);

  }

 }



 /**

 * 用于修复JDK的epoll 100% CPU的bug(在部分Linux的2.6的kernel中,poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒。了解下即可),这里采取的办法就是让所有的事件循环重新创建Selector对象

 */

 public void rebuildSelectors() {

  for (EventExecutor e: this) {

   ((NioEventLoop) e).rebuildSelector();

  }

 }



 // 创建的事件循环对象为NioEventLoop,这里将传递给事件循环组的SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler传递给了事件循环对象

 @Override

 protected EventLoop newChild(Executor executor, Object... args) throws Exception {

  return new NioEventLoop(this, executor, (SelectorProvider) args[0],

        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95