# Netty 核心原理一 基础复习
Executor 原理
该接口为JDK的JUC顶层接口。定义了一个用于执行提交的Runnable接口实例的执行器对象。该接口的主要目的用于解耦需要执行的Runnable任务与调用者之间的关联,子类将负责处理Runnable的执行方式:单线程执行、多线程执行、周期性调度执行等待。通常我们使用该接口的实例来执行任务,而不是直接创建线程来处理:
new Thread(new(RunnableTask())).start(); // 不推荐
// 推荐方式
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
2
3
4
5
6
7
8
9
10
11
接口定义源码如下所示。读者可以想想,为何不直接使用线程的方式启动?我们可以从接口的定义出发考虑这个问题:
- 接口本身是为了解耦,那么我们将需要解耦的方法定义在接口中
- 我们现在的目的是为了执行一个任务:实现了Runnable接口的RunnableTask,至于怎么执行我不知道。我可以开启一个线程来执行,我也可以同步执行
- 那么这时,如果某时我觉得启动一个线程执行该任务较好,那么我写上了new Thread(new(RunnableTask())).start(),某时我觉得还是在当前线程执行比较好,那么如果我没有抽取该接口,那么执行该方法的逻辑将会耦合在业务代码中。到时候到处找该代码进行修改
- 这时,顺理成章的我们定义了一个接口,将封装的变化放到Executor接口的实现类中执行,我们只需要定义多个Executor的子类,完成自己的逻辑,在需要时我们可以通过类似Spring的IOC方式,来修改,而不需要改动源码
- 这时我们称:接口解耦合
public interface Executor {
void execute(Runnable command);
}
2
3
4
5
ExecutorService 原理
ExecutorService接口是Executor接口的次一级接口,读者可以看到:在Executor接口中定义一个执行方法,而在ExecutorService接口中需要定义一些用于服务执行器的方法。读者可以这么想:Executor接口用于抽象任务执行,ExecutorService接口用于抽象在执行任务时的一些功能函数。源码描述如下。
public interface ExecutorService extends Executor {
// 关闭服务
void shutdown();
// 立刻关闭服务
List<Runnable> shutdownNow();
// 判断服务是否调用了shutdown
boolean isShutdown();
// 判断服务是否完全终止
boolean isTerminated();
// 等待当前服务停止
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一个任务Callable task,返回一个存根stub
<T> Future<T> submit(Callable<T> task);
// 提交一个任务Runnable task,返回一个存根stub
Future<?> submit(Runnable task);
// 提交一组callable任务执行,返回所有任务的存根stub
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交一组任务Callable task,返回所有任务的存根stub。注意,这个包含了等待执行的时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一组任务,等待其中任何一个任务完成后返回
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交一组任务,等待其中任何一个任务完成后返回。注意,这个包含了等待执行的时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
AbstractExecutorService 原理
对于面向对象的设计模式来说,这种又长又多的接口,怎样能够让子类轻松继承呢?使用模板方法设计模式。我们可以通过抽象类来实现部分公用算法,这样子类,即线程池的实现就会轻松些了,毕竟不用自己再写一遍这些算法了,AbstractExecutorService抽象类的作用便是如此。可以看到该类实现了大部分的ExecutorService定义的功能方法,将核心的execute方法放到子类实现。源码如下。
public abstract class AbstractExecutorService implements ExecutorService {
// 将Runnable对象封装为FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 将Callable对象封装为FutureTask
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 封装Runnable task,然后调用execute执行
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 封装Runnable task,然后调用execute执行。注意,这里还封装了结果
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 封装Callable task,然后调用execute执行
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/* 执行传入的任务,只要任何一个任务被执行完成就返回,然后取消其他任务,这里Any指
任何一个tasks中的任务执行完成,timed和nanos用来表明执行超时时间 */
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
// 任务判空
if (tasks == null) throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0) throw new IllegalArgumentException();
// 创建用于保存执行stub的future对象的集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// 创建ExecutorCompletionService用于执行任务
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
try {
// 记录异常,这样如果我们不能获得任何result,可以抛出最后一个异常。
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 获取集合的迭代器
Iterator<? extends Callable<T>> it = tasks.iterator();
// 先开始一项任务,其余的增量执行
futures.add(ecs.submit(it.next()));
// 任务数减1
--ntasks;
// 正在执行的任务为1
int active = 1;
for (;;) {
/* 从ExecutorCompletionService中获取一个执行任务的future,如果future为空,判
断任务数量是否大于0,如果还有下一个任务,继续递交执行。如果future不为空,
直接调用 f.get()返回执行结果,然后在finally中取消所有任务。读者可能会问,
ExecutorCompletionService是什么?很简单,看看它的构造器:
ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>>
completionQueue),一个执行器,一个阻塞队列,功能是在执行器executor
中执行任务,任务完成后将结果放入completionQueue队列中,然后生产端从这个
队列中获取结果。这就是生产者消费者模型 */
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
/* 如果指定了超时时间,等待nanos纳秒后如果future还是为空,则
抛出超时异常 */
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
/* 当然,如果所有任务都放到线程池里执行了,那么直接调用take方法
阻塞当前线程,等待任务执行完成后返回。poll方法非阻塞或者带超时
时间的阻塞,take方法阻塞直到队列里放入结果,这里是future */
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
// 传入true表明中断执行线程
futures.get(i).cancel(true);
}
}
// 间接调用doInvokeAny
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
return null;
}
}
// 带超时时间的间接调用doInvokeAny
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
/* 上面的invokeAny为其中一个任务执行完成后返回,这里的invokeAll和大家想的一样,直接
翻译All就行,就是执行所有传入的任务,然后等待所有任务执行完毕后返回 */
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null) throw new NullPointerException();
// 同样先来一个保存future对象的arraylist
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 直接遍历所有tasks来调用execute,以将其放到线程池中全部执行,将future放入集合中
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
/* 然后遍历future,等待每一个任务执行。注意这里的异常处理,当其中一个future
发生执行异常时则直接忽略掉,还是会设置done为true表明完成执行,
最后返回future,所以调用者应自行检查future是否发生了异常 */
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
/* 如果发生异常导致任务执行失败,即不属于CancellationException和
ExecutionException,那么取消所有的任务 */
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
// 带超时时间的执行所有任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null) throw new NullPointerException();
// 将超时时间转为纳秒
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
/* 遍历执行所有任务,将其转变为FutureTask放入future集合中,然后计算deadline
超时时间 */
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
/* 遍历所有的任务,开始执行。如果达到超时时间,直接返回futures。可以看到,
超时时间设置过短,会导致任务未开始执行就返回,而且没有取消任务,这时
返回的futures中有可能还在线程池中执行。这点在使用时务必小心 */
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
/* 到这里,所有任务都放到了线程池中执行。遍历所有的future,根据超时时间选择
返回或者等待 */
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
// 同样,如果出现未知异常,那么直接取消所有任务
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
ScheduledExecutorService 原理
ScheduledExecutorService 接口定义了任务周期性调度或者延迟调度的方法,我们说接口的继承等同于功能的扩展,而我们知道ExecutorService定义了执行器的服务方法,而ScheduledExecutorService继承自ExecutorService,那就等同于扩展了ExecutorService的方法,那么这些方法我们可以从源码中看到,可以对提交的Runnable command执行周期性调度。源码描述如下。
public interface ScheduledExecutorService extends ExecutorService {
// 在延迟delay时间后执行command。unit为时间单位。只调度一次
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
// 在延迟delay时间后执行callable。unit为时间单位。只调度一次
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
// 周期性调度command,在延迟initialDelay时间后执行command。period为周期时间,unit为时间单位。每次任务基于“上一次开始时间”来决定下次启动的时间, 如果一次任务执行时间大于间隔时间,会导致下一次延缓执行,不会有两次任务并行执行(也即:任务M在A时间开始执行,执行了B时间,那么这时任务M的下一次执行事件为A时间+周期时间)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 周期性调度command,在延迟initialDelay时间后执行command。period为周期时间,unit为时间单位。每次任务基于“上一次结束时间”来延迟固定时间后执行下一次任务(也即:任务M在A时间开始执行,执行了B时间,那么这时任务M的下一次执行事件为A时间+B时间+周期时间)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39