# Netty 核心原理三 MultithreadEventExecutorGroup 原理一
MultithreadEventExecutorGroup 原理
AbstractEventExecutorGroup类对上层接口的方法进行粗略实现:每个方法都调用next方法找到下一个线程来异步执行,同时返回一个Future表示异步执行的任务。而MultithreadEventExecutorGroup 则是完成了多线程的事件执行器组的实现。
核心变量与构造器
我们看到成员变量的声明中看到:
- EventExecutor[] children 表示子事件执行器数组,这时我们具备了执行器组的特性:拥有了子执行器
- AtomicInteger terminatedChildren 用于记录已经终止的子事件执行器数量,结合Promise<?> terminationFuture,我们可以让需要执行器组终止的线程进行异步等待和事件回调
- EventExecutorChooserFactory.EventExecutorChooser chooser 用于从子执行器数组中根据该接口的实例来完成下一个子执行器的获取(负载均衡器),这时我们可以在next方法中调用该选择器完成对下一个子执行器的选择
在最终的构造函数中,我们看到流程如下:
- 创建子执行器数组
- 调用子类实现的newChild方法创建子执行器的方法,使用其返回的实例完成初始化。在创建过程中,只要有一个子执行器创建实例失败,那么需要全部关闭
- 通过选择器工厂创建选择器实例
- 创建终止监听器,并将该监听器放入子执行器中
- 将子执行器数组设置其为只读执行器,不允许任何修改
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children; // 子事件执行器数组(执行器组)
private final Set<EventExecutor> readonlyChildren; // 只读子事件执行器
private final AtomicInteger terminatedChildren = new AtomicInteger(); // 已经终止的子事件执行器数量
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);// 子事件执行器终止Promise对象,我们可以在关闭执行器组时通过该Future接口来异步获取关闭结果
private final EventExecutorChooserFactory.EventExecutorChooser chooser; // 用于从子执行器数组中根据该接口的实例来完成下一个子执行器的获取(负载均衡器)
// 构造函数,提供了默认的线程工厂:ThreadPerTaskExecutor
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
// 构造函数提供默认的EventExecutorChooser实例:DefaultEventExecutorChooserFactory.INSTANCE
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// 完整构造器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) { // 参数校验
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) { // Executor实例未指定的情况下,创建默认的ThreadPerTaskExecutor实例
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads]; // 创建子执行器数组
for (int i = 0; i < nThreads; i ++) { // 初始化子执行器数组
boolean success = false;
try {
children[i] = newChild(executor, args); // 调用子类实现的创建子执行器的方法来完成初始化(为何如此设计?很明显:当前类并不知道这个子执行器的实例是谁,我只知道需要创建而已,抽象方法的魅力再一次展现)
success = true;
} catch (Exception e) {
// 创建出现异常,那么包装异常对象
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) { // 只要有一个子执行器创建实例失败,那么需要全部关闭
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) { // 当前线程等待子执行器全部关闭
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) { // 等待期间被中断唤醒,那么将当前线程中断标志位重置(中断异常会消耗标志位),由调用方来处理该中断
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children); // 通过工厂创建选择器
// 创建终止监听器,并将该监听器放入子执行器中。可以看到该监听器,在操作完成时,对当前类的terminatedChildren终止线程数进行自增,当最后一个子执行器执行完成后,设置terminationFuture的成功结果为null,表示终止完成
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 一旦执行器数组创建完成,那么将设置其为只读执行器,不允许任何修改
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
EventExecutorChooserFactory与DefaultEventExecutorChooserFactory原理
我们看到EventExecutorChooserFactory接口定义了创建EventExecutorChooser实例的方法,而由于EventExecutorChooser接口属于EventExecutorChooserFactory,所以在接口内部定义了EventExecutorChooser选择器实例的选择功能函数。
在默认的选择器DefaultEventExecutorChooserFactory工厂中,我们定义了一个全局的INSTANCE单例对象。同时在创建EventExecutorChooser实例时,将会根据子执行器数组的长度是否为2的倍数来选择合适的取余算法,而在内部取余的过程是通过原子性的自增来完成。所以我们也可以理解为默认执行器选择器执行算法为:轮询。源码描述如下。
public interface EventExecutorChooserFactory {
/**
\* 返回新的EventExecutorChooser实例
*/
EventExecutorChooser newChooser(EventExecutor[] executors);
// 定义选择功能方法
interface EventExecutorChooser {
/**
\* 返回下一个子执行器
*/
EventExecutor next();
}
}
// 默认选择器工厂
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
// 默认单例
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
// 通过executors子执行器数组来完成创建。通过检测子执行器数组的长度来选择对应的EventExecutorChooser实例
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// 笔者说过:二进制的与操作符等于二进制截断,那么这里相当于对val本身和其负数值截断,比如:val为8,那么二进制为:(这里为32位,前面的28个0省略)1000,-8为:(省略28个1)1000,那么执行与操作符,此时截断了前面29个1的值,这时为原值,表明为2的倍数。但是如果:val为7,那么二进制为111,-7为:(省略29个1)001,此时执行与运算,那么将截断为1
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
// 数组长度为2的倍数时,我们可以用二进制截断来实现:idx.getAndIncrement() & executors.length - 1,通过与运算加速取余数操作。还是以8为例,8-1=7,二进制为0111,此时截断idx.getAndIncrement()的低三位,正好落在数组 0 - 7的index上
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger(); // 原子性自增(就算溢出为负数,也不影响)
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
// 数组长度不为2的倍数,那么只能用相对较慢的abs取绝对值,然后%来计算长度。为何取绝对值?AtomicInteger溢出为负数怎么办?
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139