# Netty 核心原理十四 ThreadPerChannelEventLoop 原理与Future原理
ThreadPerChannelEventLoop 原理
ThreadPerChannelEventLoop类用于处理OIO模型的Channel通道对象,也即一个Channel由一个ThreadPerChannelEventLoop对象来处理。该类较为简单,因为大部分方法都在SingleThreadEventLoop中实现。这里主要初始化父类,同时在run方法中不断地从队列中获取任务处理,直到通道被解除注册。源码描述如下。
public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
private final ThreadPerChannelEventLoopGroup parent;
private Channel ch;
public ThreadPerChannelEventLoop(ThreadPerChannelEventLoopGroup parent) {
super(parent, parent.executor, true);
this.parent = parent;
}
// 实现通道注册。通过父类super.register来进行注册,同时添加了监听器,当注册完成时回调获取当前Channel对象。当注册失败时,调用deregister方法,将自身从ThreadPerChannelEventLoopGroup中移除
@Override
public ChannelFuture register(ChannelPromise promise) {
return super.register(promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = future.channel();
} else {
deregister();
}
}
});
}
// 主要事件函数
@Override
protected void run() {
for (;;) {
// 获取任务执行(后面我们在说OioChannel时会看到该任务的提交)
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
// 检查通道是否关闭
Channel ch = this.ch;
if (isShuttingDown()) {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
if (confirmShutdown()) {
break;
}
} else {
if (ch != null) {
// 如果通道已经解除注册,那么执行队列中所有任务后,结束当前事件循环
if (!ch.isRegistered()) {
runAllTasks();
deregister();
}
}
}
}
}
// 将自身从父ThreadPerChannelEventLoopGroup中移除
protected void deregister() {
ch = null; // 通道对象不再使用,释放引用
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
Future 原理
Future我们在学习JUC时,可以看到,它代表了一个异步执行操作。我们可以通过Future来获取到异步执行的任务状态,也可以等待任务执行完成,获取任务执行结果。同理,在Netty中,该Future继承自JUC的Future,扩展了一些方法,可以看到该方法主要添加了监听器操作。可以想想这有何好处?原来的Future没有监听器模式,那么我们只能通过get来阻塞的等待任务结束,而有了监听器,我们只需要向Future异步执行任务中添加一个监听器即可,当任务执行完成将由执行线程来完成该回调任务。同时读者也可以从中抽取异同部分:响应中断等待、不可响应中断等待、超时等待、不超时等待。源码描述如下。
public interface Future<V> extends java.util.concurrent.Future<V> {
/**
* 判断当前IO操作是否已经执行完成
*/
boolean isSuccess();
/**
* 判断我们是否能够通过cancel(boolean)方法结束任务执行
*/
boolean isCancellable();
/**
* 获取导致IO操作失败的异常
*/
Throwable cause();
/**
向当前Future添加监听器,当Future代表的异步任务执行完成时,将会回调该监听器。当然,如果当前任务已经完 成,那么将会直接执行该监听器。
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* 添加一组监听器
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* 移除监听器
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* 移除一组监听器
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* 等待当前Future代表的异步任务执行完成,当任务执行失败时,将会把异常信息抛出给调用方。可响应线程中断。
*/
Future<V> sync() throws InterruptedException;
/**
* 等待当前Future代表的异步任务执行完成,当任务(作者:黄俊,微信:bx_java)执行失败时,将会把异常信息抛出给调用方。不响应线程中断。
*/
Future<V> syncUninterruptibly();
/**
* 等待当前Future代表的异步任务执行完成,但如果任务执行失败,将不会把异常信息抛出给调用方。可响应线程中断
*/
Future<V> await() throws InterruptedException;
/**
* 等待当前Future代表的异步任务执行完成,但如果任务执行失败,将不会把异常信息抛出给调用方。不响应线程中断
*/
Future<V> awaitUninterruptibly();
/**
* 指定超时时间,等待当前Future代表的异步任务执行完成,但如果任务执行失败,将不会把异常信息抛出给调用方。可响应线程中断
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 指定超时时间单位为毫秒,为上一个方法的便捷版本。笔者认为:这里冗余了。等待当前Future代表的异步任务执行完成,但如果任务执行失败,将不会把异常信息抛出给调用方。可响应线程中断
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* 指定超时时间,等待当前Future代表的异步任务执行完成,但如果任务执行失败,将不会把异常信息抛出给调用方。不响应线程中断
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* 指定超时时间单位为毫秒,为上一个方法的便捷版本。笔者认为:这里冗余了。等待当前Future代表的异步任务执行完成,但如果任务执行失败,将不会把异常信息抛出给(作者:黄俊,微信:bx_java)调用方。不响应线程中断
*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
* 立即返回当前Future代表的异步任务的结果,如果还未完成,那么返回null
*/
V getNow();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
GenericFutureListener 原理
GenericFutureListener 接口用于表示一个监听代表异步执行任务的Future监听器。当Future被标识完成时,将会立即回调该接口的operationComplete方法,可以看到在该方法参数中传入了Future,我们就可以通过该Future来获取异步任务的执行结果。源码描述如下。
// JDK的标记接口。表明一个事件监听器
public interface EventListener {
}
// 通用监听器接口。读者需要注意这里的泛型定义:F extends Future<?>,表明只要是Future的子接口都可以传入
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* 当Future执行完成时,将回调该方法
*/
void operationComplete(F future) throws Exception;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
FutureListener 原理
该接口较为简单,仅仅就是将通用的事件监听器GenericFutureListener的泛型,进行固定,固定为Future。源码描述如下。
public interface FutureListener<V> extends GenericFutureListener<Future<V>> { }
AbstractFuture 原理
该抽象类为Future的模板类。我们从源码中可以看到,该接口主要实现了原生JDK的Future中的get方法。源码描述如下。
public abstract class AbstractFuture<V> implements Future<V> {
// 阻塞等待当前Future代表的异步任务完成
@Override
public V get() throws InterruptedException, ExecutionException {
await(); // 等待任务执行完成
Throwable cause = cause();
if (cause == null) { // 没有发生异常,那么直接返回当前结果
return getNow();
}
// 执行异常,那么包装异常对象返回
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 同get()方法,只不过这里进行了超时等待,同时在超时时间到达后,抛出TimeoutException
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
ChannelFuture 原理
ChannelFuture用于表示一个异步执行的Channel通道的IO操作。该接口较为简单,这里扩展的Future接口方法为 channel(),可用于获取与之关联的Channel通道对象。源码描述如下。
public interface ChannelFuture extends Future<Void> {
/**
* 与当前Future关联的通道对象
*/
Channel channel();
/**
* 如果当前Future代表了一个空的Future,那么返回true。这时,将不允许调用以下方法:
* addListener(GenericFutureListener)
* addListeners(GenericFutureListener[])
* await()}
* await(long, TimeUnit)} ()
* await(long)} ()
* awaitUninterruptibly()
* sync()
* syncUninterruptibly()
*/
boolean isVoid();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39