# Netty 核心原理十二 NioEventLoop 原理一
前面我们描述了NioEventLoopGroup,实际上它就是一个管理者,用于管理NioEventLoop 的生命周期:创建并传递设置的参数到NioEventLoop 、关闭NioEventLoop 。所以实际干活的就是NioEventLoop ,所以它非常的重要,同时也相对复杂,该类是业务开发中经常使用到的EventLoop。
核心变量与构造器
我们知道在Java中实现IO多路复用的就是Selector,那么该类必然需要与该类进行耦合。我们可以从变量和构造器中了解如下几点:
- 在构造器中创建了Selector对象
- 同时对原生的selectedKeys和publicSelectedKeys进行了增强,至于增强了什么,读者可以这么理解:
- JDK原生自带的放置已经选择集的类型为HashSet
- 那么操作HashSet,就需要计算hash值,同时在遍历时需要使用迭代器对象
- 而这里增强的集合就是一个数组,我们不需要使用迭代器同时也不需要计算hash值(内部使用数组下标操作)
- 由于内部实现就是两个数组进行切换操作,所以笔者这里将其省略,感兴趣的读者可以打开源码一览究竟。
public final class NioEventLoop extends SingleThreadEventLoop {
private static final int CLEANUP_INTERVAL = 256; // 默认清理周期
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); // 标识是否对Selector进行优化,默认为false
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3; // 在执行select操作时,执行循环的次数(后面在描述select方法时将会详细介绍)
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD; // 自动重新创建Selector的阈值(用于避免CPU 100%的bug)
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
}; // 用于封装selectNow的Supplier
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return NioEventLoop.super.pendingTasks();
}
}; // 用于封装获取挂起的任务数的Callable
// 静态代码块,用于初始化静态变量:SELECTOR_AUTO_REBUILD_THRESHOLD
static {
String key = "sun.nio.ch.bugLevel";
try {
String buglevel = SystemPropertyUtil.get(key); // 获取设置的JDK bug等级,通常我们不设置该变量
if (buglevel == null) {
System.setProperty(key, "");
}
} catch (SecurityException e) {
}
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512); // 默认自动重建Selector阈值为512
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) { // 如果小于最小selector返回事件个数,那么设置为0
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
Selector selector; // JDK的Selector对象
private SelectedSelectionKeySet selectedKeys; // 已经选择的SelectionKey
private final SelectorProvider provider; // 用于创建Selector对象的提供器
private final AtomicBoolean wakenUp = new AtomicBoolean(); // 用于控制是否调用Selector.select()对线程阻塞
private final SelectStrategy selectStrategy; // 选择策略。用于控制执行事件时的方法,比如执行select,或者selectNow
private volatile int ioRatio = 50; // 用于控制线程在执行Channel IO操作的时间和执行任务队列的事件时间占比,默认可以看到各占一半
private int cancelledKeys; // 用于保存取消的SelectionKey个数
private boolean needsToSelectAgain; // 用于控制是否应该重新进行select操作
// 构造方法:初始化父类,这里设置最大挂起任务为 DEFAULT_MAX_PENDING_TASKS
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector(); // 创建Selector实例
selectStrategy = strategy;
}
// 创建Selector实例
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector(); // 直接通过提供器打开Selector
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) { // 如果关闭优化,那么直接返回
return selector;
}
try { // 否则创建SelectedSelectionKeySet对象
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Class<?> selectorImplClass =
Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
// 确保当前选择器的实现为SelectorImpl
if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
return selector;
}
// 获取SelectorImpl类的selectedKeys(已经选择的SelectionKey)和publicSelectedKeys(SelectionKey的视图,只允许被移除,但是不允许被添加),并将它们设置为SelectedSelectionKeySet selectedKeySet
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
selectedKeys = selectedKeySet;
} catch (Throwable t) {
selectedKeys = null;
logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
}
return selector;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
核心方法run原理
我们在前面知道,事件循环线程将会执行子类复写的run方法,所以该方法将会是分析这个类的起点。执行流程如下:
- 根据selectNowSupplier和任务队列有无任务来计算将要进行的操作
- 根据设置的ioRatio(默认为50%)来执行Selector中注册Channel IO事件和任务队列中的任务(taskQueue、ScheduledTaskQueue、tailTasks)
- 检测事件循环是否已经关闭,如果关闭,那么将所有Selectionkey取消关闭
// 选择策略接口
public interface SelectStrategy {
/**
* 表明需要执行一个阻塞select操作
*/
int SELECT = -1;
/**
* 表明执行一个非阻塞的selectNow操作
*/
int CONTINUE = -2;
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
// 我们这里看默认选择策略即可
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy(); // 单例模式
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; // 如果有任务,那么直接返回selectNow的准备好的事件个数,否则返回SELECT,表示执行阻塞的选择操作
}
}
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { // 根据selectNowSupplier和任务队列有无任务来计算将要进行的操作
case SelectStrategy.CONTINUE: // 如果为CONTINUE,那么继续循环
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); // 执行select选择操作,注意这里将把wakenUp设置为false,并使用原来的值来进行select
if (wakenUp.get()) { // 如果设置了唤醒,那么执行唤醒方法(该方法的调用将会导致下一次执行select阻塞操作时立即返回)
selector.wakeup();
}
default: // 默认不进行选择
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) { // 如果设置100%的处理IO通道事件,那么首先执行SelectedKey然后再执行任务队列中的任务
processSelectedKeys();
runAllTasks();
} else {// 否则计算processSelectedKeys执行IO事件的时间,然后设置执行任务队列的时间
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 可以看到,这里ioRatio为50,那么假如执行了IO事件的时间为100ms,那么此时:100ms * (100-50) / 50 = 100ms 正好一半一半
}
if (isShuttingDown()) { // 检测事件循环是否已经关闭,如果关闭,那么将所有Selectionkey取消关闭
closeAll();
if (confirmShutdown()) { // 确认没任务执行且状态正确,那么结束循环
break;
}
}
} catch (Throwable t) { // 捕捉所有移除进行日志记录
logger.warn("Unexpected exception in the selector loop.", t);
// 避免可能出现的连续的抛出异常导致CPU占用过多
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
核心方法select原理
该方法用于执行Selector的select函数,来获取注册到Selector中准备好的事件,参数oldWakenUp表示之前wakeup的旧值。流程如下:
- 通过周期性任务调度超时时间来计算select timeout超时时间
- 检测超时
- 检测任务队列中是否存在任务执行,如果存在,那么执行selectNow,避免任务队列中的任务得不到执行
- 执行超时selector.select(timeoutMillis)等待
- 判断是否:用户唤醒当前线程、队列中是否存在任务、是否有通道事件准备就绪,从而结束当前循环
- 根据selectCnt来检测是否发生selector空转的bug,对Selector重新构建
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0; // 计算for循环次数,用于与MIN_PREMATURE_SELECTOR_RETURNS变量进行日志分析(读者注意这里的selectCnt用于记录的是:在超时时间内select函数被唤醒的次数,详细参考以下代码)
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // 计算select timeout超时时间(读者可以想想这么怎么计算?考虑下周期性调度任务的超时?)
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // 计算超时时间毫秒数 1000000L 为纳秒转为毫秒,前面的+ 500000L是为了对计算的毫秒数向上取整
if (timeoutMillis <= 0) { // 超时时间不存在,那么检测是否是第一次循环(通过selectCnt),如果是,那么直接执行selectNow即可
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) { // 任务队列中存在任务(taskqueue或者tailTask)且cas设置wakeup标志位为true成功,那么执行selectNow后结束循环
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis); // 执行超时select等待
selectCnt ++; // 自增循环等待次数
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - 已经由通道事件产生
// - 外部函数设置了唤醒操作
// - 任务队列中存在任务
// - 存在可以周期性调度任务
// 那么结束循环
break;
}
if (Thread.interrupted()) { // 线程被中断,那么记录日志结束循环
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 达到超时时间,那么重置selectCnt(再次强调:selectCnt记录的是超时时间内select函数返回循环的次数)
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 设置了重新构建Selector的阈值,同时当前循环次数大于了SELECTOR_AUTO_REBUILD_THRESHOLD,那么此时判定出现了selector空转的bug。那么记录日志重新创建selector
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// 上面我们重新构建了selector,那么立即执行一次selectNow来更新selectedKeys集合
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { // 循环次数大于设置的PREMATURE次数,那么记录日志
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); // 如果没有周期性任务,那么默认进行1s的超时等待
// 计算select timeout超时时间
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169