# Netty 核心原理十 ThreadPerChannelEventLoopGroup原理
ThreadPerChannelEventLoopGroup 将会为每一个通道创建一个自己的EventLoop实例,我们看到这种特性就是BIO的特性,当然我们在后面描述oio时将会用到该类。读者注意:该类在生产线上慎用(通常不用),因为无法利用NIO的特性,一个通道一个线程将会导致系统性能极度下降。
核心变量与构造器
通过变量的定义,很容易的理解到以下几点:
- 该类对于每个通道都使用一个EventLoop对象来处理
- 由于通道数可能非常大,这时可以通过设置maxChannels来限制并发的通道数
- 当通道数达到最大时,将会抛出ChannelException tooManyChannels异常
- 我们创建activeChildren和idleChildren来缓存EventLoop对象,提升性能
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
private final Object[] childArgs; // 传递给管理的EventLoop对象使用参数
private final int maxChannels; // 最大通道数
final Executor executor; // 执行器对象
final Set<EventLoop> activeChildren =
Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap()); // 管理活动的EventLoop对象
final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>(); // 管理空闲的EventLoop对象
private final ChannelException tooManyChannels; // 在通道数量超过最大通道数时抛出的异常对象
private volatile boolean shuttingDown; // 是否处于 关闭中 状态
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE); // 关闭结果Promise,调用方可以根据该对象来判断当前执行器组是否已经终止
private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
}
}; // EventLoop对象终止后回调的监听器
// 构造方法:设置maxChannels最大通道数为0,表示无限制
protected ThreadPerChannelEventLoopGroup() {
this(0);
}
// 构造方法:设置maxChannels最大通道数为maxChannels,并提供了默认工厂函数
protected ThreadPerChannelEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
// 构造方法:设置maxChannels最大通道数为maxChannels,并提供了默认执行器(每个任务创建一个线程来执行)
protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);
}
// 完整构造器:对参数进行初始化
protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
if (maxChannels < 0) {
throw new IllegalArgumentException(String.format(
"maxChannels: %d (expected: >= 0)", maxChannels));
}
if (executor == null) {
throw new NullPointerException("executor");
}
if (args == null) {
childArgs = EmptyArrays.EMPTY_OBJECTS;
} else {
childArgs = args.clone();
}
this.maxChannels = maxChannels;
this.executor = executor;
tooManyChannels = ThrowableUtil.unknownStackTrace(
new ChannelException("too many channels (max: " + maxChannels + ')'),
ThreadPerChannelEventLoopGroup.class, "nextChild()");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
核心方法newChild原理
该方法将创建管理的EventLoop对象。可以看到创建的类型为ThreadPerChannelEventLoop对象。源码描述如下。
protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
return new ThreadPerChannelEventLoop(this);
}
1
2
3
4
5
2
3
4
5
核心方法next原理
由于该类对于每个通道都由一个EventLoop对象来执行,所以这里的next方法将不支持操作。源码描述如下。
public EventLoop next() {
throw new UnsupportedOperationException();
}
1
2
3
4
5
2
3
4
5
核心方法shutdownGracefully原理
该方法用于关闭事件循环组。流程如下:
- 设置shuttingDown为true,标识已经处于关闭中状态
- 关闭所有处于活动中的EventLoop
- 关闭所有空闲的EventLoop
- 如果此时状态已经变为终止状态,那么设置terminationFuture执行成功
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
shuttingDown = true; // 标识已经处于关闭中状态
for (EventLoop l: activeChildren) { // 关闭所有处于活动中的EventLoop
l.shutdownGracefully(quietPeriod, timeout, unit);
}
for (EventLoop l: idleChildren) { // 关闭所有空闲的EventLoop
l.shutdownGracefully(quietPeriod, timeout, unit);
}
// 如果此时状态已经变为终止状态,那么设置terminationFuture执行成功,那么此时调用方可以通过terminationFuture来获取终止状态
if (isTerminated()) {
terminationFuture.trySuccess(null);
}
return terminationFuture();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
核心方法awaitTermination原理
该方法用于超时等待事件循环组终止。流程如下:
- 计算超时时间
- 遍历所有活动的EventLoop,调用他们的awaitTermination
- 遍历所有空闲的EventLoop,调用他们的awaitTermination
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout); // 计算超时时间
for (EventLoop l: activeChildren) { // 遍历所有活动的EventLoop,调用他们的awaitTermination
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) { // 检查是否超时
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
for (EventLoop l: idleChildren) { // 遍历所有空闲的EventLoop,调用他们的awaitTermination
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) { // 检查是否超时
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
核心方法register原理
该系列方法将把传递进来的Channel通道注册到管理的EventLoop中。我们看到核心方法就是nextChild,该方法将返回一个可用的EventLoop,然后调用其register方法完成注册。只不过区别如下:
- register(Channel channel)在注册时创建了一个DefaultChannelPromise对通道进行包装
- register(ChannelPromise promise)直接将ChannelPromise 传递给l.register方法
public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
EventLoop l = nextChild();
return l.register(new DefaultChannelPromise(channel, l));
} catch (Throwable t) {
return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
}
}
@Override
public ChannelFuture register(ChannelPromise promise) {
try {
return nextChild().register(promise);
} catch (Throwable t) {
promise.setFailure(t);
return promise;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
核心方法nextChild原理
上面我们看到该方法将会返回一个可用的EventLoop。流程如下:
- 判断状态是否已经处于关闭中,此时将抛出任务拒绝异常
- 尝试从空闲的队列中获取EventLoop。如果获取失败,那么判断是否达到最大通道数,如果达到,那么抛出tooManyChannels异常,否则创建一个新的EventLoop将其添加到activeChildren活动对象集中,并返回该EventLoop
private EventLoop nextChild() throws Exception {
if (shuttingDown) {
throw new RejectedExecutionException("shutting down");
}
EventLoop loop = idleChildren.poll();
if (loop == null) { // 获取失败
if (maxChannels > 0 && activeChildren.size() >= maxChannels) { // 检查是否达到最大通道限制
throw tooManyChannels;
}
loop = newChild(childArgs); // 创建新的EventLoop
loop.terminationFuture().addListener(childTerminationListener);
}
activeChildren.add(loop); // 添加到活动集
return loop;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31