# Netty 核心原理十六 ChannelPromise 原理
ChannelPromise 原理
该接口用于扩展ChannelFuture和Promise接口,提供了对ChannelFuture变量的写操作。我们看到该接口主要提供了对Channel的处理设置成功的空参数操作,同时支持返回一个新的ChannelPromise对象。源码描述如下。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
// 设置通道执行成功,此时不需要提供参数
ChannelPromise setSuccess();
boolean trySuccess();
// 如果接口的实例isVoid方法返回true,那么返回一个新的ChannelPromise,否则返回该接口本身
ChannelPromise unvoid();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DefaultChannelPromise 原理
该类实现了ChannelPromise接口,但是我们看到它继承自DefaultPromise类,所以大部分工作都在DefaultPromise父类中实现,该类主要提供对ChannelFuture接口和ChannelPromise接口的新增方法进行实现,我们主要看看这几个方法和变量即可。通过源码我们可以得知如下信息:
- DefaultChannelPromise实现了FlushCheckpoint接口,该接口用于刷新还原点,后面我们将会详细介绍该接口和其所属类
- 定义了所属通道对象和还原点变量
- setSuccess()设置空结果完成,最终通过调用父类DefaultPromise的setSuccess(null)完成设置
- 用于isVoid总是返回false,所以unvoid()方法将总是返回当前DefaultChannelPromise对象
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel; // 所属通道对象
private long checkpoint; // 当前还原点
// 构造器。通过通道对象创建
public DefaultChannelPromise(Channel channel) {
this.channel = channel;
}
// 构造器。通过通道对象和执行器创建
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = channel;
}
// 设置空结果返回
@Override
public ChannelPromise setSuccess() {
return setSuccess(null);
}
@Override
public ChannelPromise setSuccess(Void result) {
super.setSuccess(result);
return this;
}
@Override
public boolean trySuccess() {
return trySuccess(null);
}
// unvoid总是返回当前ChannelPromise
@Override
public ChannelPromise unvoid() {
return this;
}
// isVoid总是返回false
@Override
public boolean isVoid() {
return false;
}
// 刷新和设置当前还原点
@Override
public long flushCheckpoint() {
return checkpoint;
}
@Override
public void flushCheckpoint(long checkpoint) {
this.checkpoint = checkpoint;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
接下来我们来看看ChannelFlushPromiseNotifier类,因为FlushCheckpoint属于该类的内部接口。ChannelFlushPromiseNotifier类允许向其中注册ChannelFuture实例,一旦写入了一定数量的数据并到达检查点,ChannelFuture就会被通知。 该类在Netty中不用,了解下即可。源码描述如下。
public final class ChannelFlushPromiseNotifier {
private long writeCounter; // 数据写入量
private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>(); // 保存注册的ChannelFuture实例
private final boolean tryNotify; // 用于控制通知方式。true:调用ChannelFuture的tryFailure或者trySuccess false:调用ChannelFuture的setFailure或者setSuccess
// 构造器。用于设置tryNotify变量
public ChannelFlushPromiseNotifier(boolean tryNotify) {
this.tryNotify = tryNotify;
}
// 向当前ChannelFlushPromiseNotifier中添加ChannelPromise实例,它将在指定的pendingDataSize到达后被通知
public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
if (promise == null) {
throw new NullPointerException("promise");
}
if (pendingDataSize < 0) {
throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize);
}
long checkpoint = writeCounter + pendingDataSize;
if (promise instanceof FlushCheckpoint) { // 若已经实现了FlushCheckpoint接口,设置新的checkpoint同时将其添加到队列中
FlushCheckpoint cp = (FlushCheckpoint) promise;
cp.flushCheckpoint(checkpoint);
flushCheckpoints.add(cp);
} else { // 否则包装到默认的FlushCheckpoint接口实现类,然后将其添加到队列中
flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
}
return this;
}
// 通知注册的ChannelPromise实例
public ChannelFlushPromiseNotifier notifyPromises() {
notifyPromises0(null);
return this;
}
// 实际通知操作
private void notifyPromises0(Throwable cause) {
if (flushCheckpoints.isEmpty()) { // 没有需要通知的实例,那么直接返回
writeCounter = 0;
return;
}
final long writeCounter = this.writeCounter;
for (;;) { // 遍历处理注册的ChannelPromise实例
FlushCheckpoint cp = flushCheckpoints.peek(); // 获取队列头部的ChannelPromise实例
if (cp == null) { // 如果通知列表中没有任何内容,则重置计数器
this.writeCounter = 0;
break;
}
if (cp.flushCheckpoint() > writeCounter) { // ChannelPromise实例的还原点大于writeCounter
if (writeCounter > 0 && flushCheckpoints.size() == 1) { // 队列中只有一个ChannelPromise实例
this.writeCounter = 0; // 还原写入数量
cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter); // 重新设置Checkpoint
}
break; // 由于我们是FIFO的队列,所以头部的还原点未达到pendingDataSize,结束循环
}
// 从队列中移除一个ChannelPromise实例实例,同时根据tryNotify变量来调用不同方法完成通知
flushCheckpoints.remove();
ChannelPromise promise = cp.promise();
if (cause == null) { // 异常为空,那么调用空成功方法
if (tryNotify) {
promise.trySuccess();
} else {
promise.setSuccess();
}
} else {
if (tryNotify) {
promise.tryFailure(cause);
} else {
promise.setFailure(cause);
}
}
}
// 避免写入数量发生溢出
final long newWriteCounter = this.writeCounter;
if (newWriteCounter >= 0x8000000000L) {
// 只有当计数器变得非常大时才重置计数器
this.writeCounter = 0;
for (FlushCheckpoint cp: flushCheckpoints) { // 遍历列表修正Checkpoint
cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
}
}
}
// 内部接口,用于操作Checkpoint
interface FlushCheckpoint {
long flushCheckpoint();
void flushCheckpoint(long checkpoint);
ChannelPromise promise();
}
// 默认FlushCheckpoint包装类,当传入的ChannelPromise没有实现FlushCheckpoint接口时,包装到其中
private static class DefaultFlushCheckpoint implements FlushCheckpoint {
private long checkpoint;
private final ChannelPromise future;
DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
this.checkpoint = checkpoint;
this.future = future;
}
@Override
public long flushCheckpoint() {
return checkpoint;
}
@Override
public void flushCheckpoint(long checkpoint) {
this.checkpoint = checkpoint;
}
@Override
public ChannelPromise promise() {
return future;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
DefaultChannelGroupFuture 原理
该类用于实现ChannelGroupFuture接口。前面我们知道ChannelGroupFuture接口代表了一组Channel的异步的IO操作。现在我们来看看默认的实现如何对这些操作进行管理。通过源码我们得知:
- 该类继承自DefaultPromise,所以大部分操作均在DefaultPromise类中完成
- successCount变量记录了成功数量
- failureCount变量记录了失败数量
- Map<Channel, ChannelFuture> futures保存了管理的通道的异步IO操作集合
- 当我们在构建DefaultChannelGroupFuture实例时,将向管理的所有ChannelFuture中添加childListener监听器,这时当异步操作完成后将会回调其中的operationComplete方法,在该方法中我们完成如下操作:
- 记录成功和失败的IO异步任务个数
- 当所有异步IO操作均完成时(callSetDone为true),那么根据管理的IO异步任务是否存在失败来选择调用setFailure0或者setSuccess0
- 当我们调用setFailure0时,将遍历所有ChannelFuture,找到失败的ChannelFuture,然后将其和Channel封装为DefaultEntry元组数组,作为参数调用setFailure0方法
final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
private final ChannelGroup group; // 所属通道组
private final Map<Channel, ChannelFuture> futures; // 通道的异步IO操作集合
private int successCount; // 成功数量
private int failureCount; // 失败数量
private final ChannelFutureListener childListener = new ChannelFutureListener() { // 用于向管理的ChannelFuture添加的监听器
@Override
public void operationComplete(ChannelFuture future) throws Exception {
boolean success = future.isSuccess();
boolean callSetDone; // 表示所有异步IO操作均完成
synchronized (DefaultChannelGroupFuture.this) { // 记录成功和失败个数
if (success) {
successCount ++;
} else {
failureCount ++;
}
callSetDone = successCount + failureCount == futures.size();
assert successCount + failureCount <= futures.size();
}
if (callSetDone) { // 所有异步操作完成后,根据结果来调用setFailure0或者setSuccess0
if (failureCount > 0) { // 有执行失败的通道IO任务
List<Map.Entry<Channel, Throwable>> failed =
new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
for (ChannelFuture f: futures.values()) { // 遍历添加失败的ChannelFuture到failed列表中
if (!f.isSuccess()) {
failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause())); // 包装通道和所属异常信息到DefaultEntry
}
}
setFailure0(new ChannelGroupException(failed));
} else {
setSuccess0();
}
}
}
};
// 构造器,用于初始化成员变量
DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
super(executor);
this.group = group;
this.futures = Collections.unmodifiableMap(futures);
for (ChannelFuture f: this.futures.values()) { // 遍历ChannelFuture,向其中添加监听器
f.addListener(childListener);
}
if (this.futures.isEmpty()) { // Future为空,那么直接设置成功
setSuccess0();
}
}
// 可以看到以下两个方法的判断次序:成功个数先判断还是失败个数先判断
@Override
public synchronized boolean isPartialSuccess() {
return successCount != 0 && successCount != futures.size();
}
@Override
public synchronized boolean isPartialFailure() {
return failureCount != 0 && failureCount != futures.size();
}
// 内部类用于封装通道和异常对象的元组
private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
private final K key;
private final V value;
DefaultEntry(K key, V value) {
this.key = key;
this.value = value;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143