# Netty 核心原理九 NioEventLoopGroup与EventLoopGroup 原理
在SingleThreadEventExecutor中我们看到,当我们执行execute方法时会将任务放入到普通任务队列中,同时调用doStartThread方法启动执行线程,而执行线程将会调用子类实现的run方法,完成执行器任务的执行。那么很容易想到,该类作为默认执行器,那么将会重写run方法完成自身的逻辑,而其他所有的操作,都已经在SingleThreadEventExecutor中完成。所以我们这里直接看run方法和构造函数实现即可。
public final class DefaultEventExecutor extends SingleThreadEventExecutor {
// 构造器:通过指定执行器初始化父类
public DefaultEventExecutor(Executor executor) {
this(null, executor);
}
// 构造器:通过指定事件执行器组初始化父类
public DefaultEventExecutor(EventExecutorGroup parent) {
this(parent, new DefaultThreadFactory(DefaultEventExecutor.class));
}
// 构造器:通过指定事件执行器组和线程工厂初始化父类
public DefaultEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}
// 重写父类的run方法,在该方法中不断循环执行任务队列中的任务
protected void run() {
for (;;) {
// 从队列中获取任务执行,并且更新最后任务执行时间
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) { // 确认是否需要关闭事件执行器,如果关闭,那么直接退出循环结束任务执行
break;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
EventLoopGroup 原理
EventLoopGroup 是一个特殊的 EventExecutorGroup 事件执行器组,这里笔者将其翻译为事件循环组。它在EventExecutorGroup的基础上增加了对Channel通道的注册操作。源码描述如下。
public interface EventLoopGroup extends EventExecutorGroup {
/**
* 返回事件循环组中的下一个事件循环对象
*/
@Override
EventLoop next();
/**
* 将channel注册到事件循环组中,返回的ChannelFuture表明执行结果
*/
ChannelFuture register(Channel channel);
/**
* 使用ChannelPromise来注册通道,这里读者只需要知道是将channel通道包装在ChannelPromise中即可
*/
ChannelFuture register(ChannelPromise promise);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
EventLoop 原理
EventLoop 表示 EventLoopGroup 中的事件循环,我们看到该接口是一个特殊的EventLoopGroup,很明显,又是一个只有自己的组。所以该组将处理所有注册到其中的channel操作,同时由于其包含在EventLoopGroup中,所以增加了一个用于获取所属事件循环组的parent方法。源码描述如下。
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
2
3
4
5
6
7
MultithreadEventLoopGroup 原理
MultithreadEventLoopGroup表示一个多线程处理的事件循环组,实现相当简单,因为大部分操作都在MultithreadEventExecutorGroup抽象类中实现了,这里只不过对register方法进行了默认实现。可以看到,所有操作均是调用next方法来获取事件循环组中的下一个EventLoop事件循环来处理。所以更加能表现了MultithreadEventLoopGroup仅仅只是管理了EventLoop,真正干活的是EventLoop。源码描述如下。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS; // 默认事件循环线程个数
static {
// 默认取 CPU核心数 * 2
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}
// 构造器:如果线程数为0,那么将使用默认线程数
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 默认线程工厂,指定优先级为最高优先级
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
// 直接调用父类的Chooser来完成下一个事件循环对象的选择
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
// 由子类完成对事件循环对象的创建
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
/****************所有操作均由事件循环对象来完成****************/
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
DefaultEventLoopGroup 原理
DefaultEventLoopGroup为默认的事件循环组实现,同样较为简单:既然是默认事件循环组,那么真实处理的肯定也是默认的事件循环对象,DefaultEventLoop。源码描述如下。
public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
/**
* 使用默认的线程数
*/
public DefaultEventLoopGroup() {
this(0);
}
/**
* 指定线程数创建
*/
public DefaultEventLoopGroup(int nThreads) {
this(nThreads, null);
}
/**
* 指定线程数和线程工厂
*/
public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
// 事件循环对象为默认事件循环
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
NioEventLoopGroup 原理
NioEventLoopGroup用于处理NIO Channel操作的事件循环组。我们看到这里创建的子事件循环为NioEventLoop,同理这里由于是操作NIO的事件,所以需要初始化SelectorProvider来创建Selector实例。该类也是在生产环境中常用的事件循环组,但是实现也较为简单,因为大量的操作也是在MultithreadEventLoopGroup中完成。源码描述如下。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* 构造器:初始化SelectorProvider,该对象将初始化Selector选择器
*/
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
/**
* 构造器:使用默认的选择策略(作者:黄俊,微信:bx_java)DefaultSelectStrategyFactory.INSTANCE,该选择策略将会在EventLoop事件循环对象中处理Selector通道选择时使用
*/
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
// 使用默认拒绝策略初始化父类
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
/**
* 设置事件循环处理线程,在处理Channel IO的时间占比和执行普通任务的时间占比,默认为 50%
*/
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
/**
* 用于修复JDK的epoll 100% CPU的bug(在部分Linux的2.6的kernel中,poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒。了解下即可),这里采取的办法就是让所有的事件循环重新创建Selector对象
*/
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}
// 创建的事件循环对象为NioEventLoop,这里将传递给事件循环组的SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler传递给了事件循环对象
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95