# Netty 核心原理六 SingleThreadEventExecutor 原理一
SingleThreadEventExecutor 原理
AbstractScheduledEventExecutor实现了周期性任务调度的支持,该类继承自AbstractScheduledEventExecutor,同时提供了在单线程中执行所有提交任务的功能方法。
核心变量与构造器
通过这些变量和构造器我们知道如下几点:
- SingleThreadEventExecutor中有执行状态state
- 有一个任务执行队列
Queue<Runnable> taskQueue
,默认为长度为maxPendingTasks的LinkedBlockingQueue - 有一个
Set<Runnable> shutdownHooks
,用于在关闭时执行其中的Runnable动作 - 有一个DefaultThreadProperties内部类实现了ThreadProperties,该接口将提供给外部获取线程的属性,我们看到默认的线程属性仅仅只是包装了Thread对象中的方法
// 标识接口,用于表明执行提交的任务拥有着顺序性
public interface OrderedEventExecutor extends EventExecutor {
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 最大挂起执行的任务。可以看到这里最小为16,默认为整形最大值
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
// 执行器状态
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
// 用于唤醒和执行空操作的空任务
private static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
private static final Runnable NOOP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
// JUC的基础。用于封装state变量和threadProperties变量的原子性修改操作
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER;
private final Queue<Runnable> taskQueue; // 任务队列
private volatile Thread thread; // 执行线程对象
private volatile ThreadProperties threadProperties; // 线程属性
private final Executor executor; // 执行器对象
private volatile boolean interrupted; // 标识线程是否被中断
private final Semaphore threadLock = new Semaphore(0); // 线程锁信号量
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); // 在关闭时执行的操作链表
private final boolean addTaskWakesUp; // 是否允许添加任务唤醒线程
private final int maxPendingTasks; // 最大挂起的任务
private final RejectedExecutionHandler rejectedExecutionHandler; // 拒绝函数
private volatile int state = ST_NOT_STARTED; // 当前执行器状态
private volatile long gracefulShutdownQuietPeriod; // 静默期持续事件
private volatile long gracefulShutdownTimeout; // 关闭超时时间
private long gracefulShutdownStartTime; // 关闭开始时间
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE); // 用于外部线程等待关闭的Promise
// 构造器:指定了默认的ThreadPerTaskExecutor执行器
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
}
// 构造器:指定了默认的最大挂起执行任务和拒绝函数:抛出任务拒绝异常
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
}
// 最终构造器
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent); // 初始化执行器组
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks); // 创建任务队列
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
// 默认为指定数量的LinkedBlockingQueue
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
// 内部类用于获取当前执行器所属线程对象的属性
private static final class DefaultThreadProperties implements ThreadProperties {
private final Thread t;
DefaultThreadProperties(Thread t) {
this.t = t;
}
@Override
public State state() {
return t.getState();
}
@Override
public int priority() {
return t.getPriority();
}
@Override
public boolean isInterrupted() {
return t.isInterrupted();
}
@Override
public boolean isDaemon() {
return t.isDaemon();
}
@Override
public String name() {
return t.getName();
}
@Override
public long id() {
return t.getId();
}
@Override
public StackTraceElement[] stackTrace() {
return t.getStackTrace();
}
@Override
public boolean isAlive() {
return t.isAlive();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
核心方法execute实现原理
我们知道核心的方法便是execute,因为Executor接口定义了如何执行任务,子类将会通过该方法向事件执行器中添加执行的任务。我们看到处理流程如下:
- 检测任务是否为空
- 根据当前线程是否为当前执行器的执行线程来选择是否直接添加任务,还是调用startThread尝试启动该事件执行器的线程来执行任务,然后将任务添加到任务队列中。我们知道,如果线程还未启动的话,那么无法执行任务,所以我们需要尝试startThread,那么就需要该方法允许重入。同时,由于我们的任务队列是LinkedBlockingQueue,那么天生线程安全,所以可以多任务来操作
- 判断当前事件执行器是否已经关闭,从而将任务从队列中移除,同时抛出拒绝异常
- 如果设置addTaskWakesUp为false,那么根据wakesUpForTask方法的返回值来选择是否唤醒线程
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 根据当前线程是否为当前执行器的执行线程来选择是否直接添加任务,还是调用startThread尝试启动该事件执行器的线程来执行任务,然后将任务添加到任务队列中
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread(); // 启动任务线程
addTask(task); // 将任务添加到任务队列
if (isShutdown() && removeTask(task)) { // 判断当前事件执行器是否已经关闭,从而将任务从队列中移除,同时抛出拒绝异常
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) { // 如果设置addTaskWakesUp为false,那么根据wakesUpForTask方法的返回值来选择是否唤醒线程
wakeup(inEventLoop);
}
}
// 该方法当前版本总是返回true
protected boolean wakesUpForTask(Runnable task) {
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
核心方法addTask实现原理
该方法相对简单,判断任务为空后,调用offerTask方法将任务添加到队列,如果添加失败,那么抛出任务拒绝异常。在offerTask中,将会判断事件执行器状态,从而看看是否拒绝或者添加任务。
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
核心方法startThread实现原理
该方法用于启动事件执行器的线程。流程如下:
- 尝试原子性的将状态state从ST_NOT_STARTED修改为ST_STARTED,如果成功,那么调用doStartThread启动线程
- 创建Runnable任务执行
- 在Runnable中获取当前线程并检测中断,如果设置了中断标记位,那么中断当前线程
- 更新最后执行任务的时间
- 执行子类需要重写的run方法:SingleThreadEventExecutor.this.run()。通过try catch捕捉所有子类执行run的异常。保证当前线程不会因为子类的异常而退出
- 当执行完成后需要将执行器状态修改为ST_SHUTTING_DOWN
- 执行confirmShutdown方法,运行所有剩余的任务和关闭钩子函数
- 调用cleanup钩子函数执行资源清理操作
- 修改事件执行器状态为终止状态,同时释放线程锁信号量
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 原子性更新state
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() { // 直接通过执行器执行任务
@Override
public void run() {
// 获取当前线程并检测中断,如果设置了中断标记位,那么中断当前线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime(); // 更新最后执行任务的时间,也即获取当前时间对lastExecutionTime进行赋值
try { // 执行子类需要重写的run方法。通过try catch捕捉所有子类执行run的异常。保证当前线程不会因为子类的异常而退出
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 当执行完成后需要将执行器状态修改为ST_SHUTTING_DOWN,由此可见,上述的run方法一旦返回,那么当前执行器就等同于关闭,这时很容易猜到子类的SingleThreadEventExecutor.this.run()便是处理业务的核心操作
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// 如果成功执行SingleThreadEventExecutor.this.run(),而不是因为抛出异常而退出,那么这时需要检测gracefulShutdownStartTime是否为0,如果此时gracefulShutdownStartTime为0,那么表明子类没有调用confirmShutdown()方法,这时触发一个错误日志
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// 执行confirmShutdown方法,运行所有剩余的任务和关闭钩子函数
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup(); // 执行资源清理操作,子类可以实现该钩子函数来完成资源释放
} finally {
// 修改事件执行器状态为终止状态,同时释放线程锁信号量
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) { // 任务队列不为空,那么触发警告日志
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null); // 设置终止Future成功完成
}
}
}
}
});
}
// 子类重写该方法执行处理操作
protected abstract void run();
// 子类实现该钩子方法完成自己的资源清理
protected void cleanup() {
// NOOP
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153