# Netty 核心原理十九 通道处理详细设计要领
使用实例
我们来看一个实例:
- 定义了一个服务端用于处理Http请求的服务端
- 在childHandler(new HttpHelloWorldServerInitializer(sslCtx)) 中配置了用于处理客户端数据的操作
- 在HttpHelloWorldServerHandler中接收配置的通道处理器对客户端的Http请求信息的处理结果,进行业务处理,然后封装Http响应信息写入到通道中
public final class HttpHelloWorldServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
public static void main(String[] args) throws Exception {
// 配置 SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 主事件循环组(用于接收客户端连接)
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 工作事件循环组(前面我们看到过默认的数量为:CPU 核心数 * 2)将每个接收到的连接绑定到工作事件循环组中的线程,之后所有的该连接的操作都由该线程来完成,保证了线程安全(参考之前对于事件循环组的原理)
try {
ServerBootstrap b = new ServerBootstrap(); // 服务端 启动器
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpHelloWorldServerInitializer(sslCtx)); // 看这里(重点)
// 绑定接收客户端端口
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// 通道初始化
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public HttpHelloWorldServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
// 对接收到的客户端连接的SocketChannel配置通道处理器
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpRequestDecoder()); // Netty 自带解析Http请求的处理
p.addLast(new HttpResponseEncoder()); // Netty 自带编码Http信息的处理
p.addLast(new HttpObjectAggregator(2000));// Netty 自带聚合从Http中读取到的数据,提供统一数据(比如:HttpMessage(Http头部)、HttpContent(Http内容))
p.addLast(new HttpHelloWorldServerHandler()); // 业务处理器
}
}
// 业务处理器
public class HttpHelloWorldServerHandler extends ChannelInboundHandlerAdapter {
private static final byte[] CONTENT = { 'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd' };
private static final AsciiString CONTENT_TYPE = new AsciiString("Content-Type");
private static final AsciiString CONTENT_LENGTH = new AsciiString("Content-Length");
private static final AsciiString CONNECTION = new AsciiString("Connection");
private static final AsciiString KEEP_ALIVE = new AsciiString("keep-alive");
// 读取完成所有客户端数据后,将响应信息写入客户端
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpUtil.isKeepAlive(req);
// 包装 Http 响应体
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, KEEP_ALIVE);
ctx.write(response);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
设计推理
我们来对Netty的通道处理器来进行推理,因为混沌学习法的核心---推理+验证:
- 前面我们了解到Boss Group 和 Worker Group 分别用于接收来自客户端的连接 和 处理客户端的连接,为了保证操作Socket 的安全,我们将ServerSocketChannel唯一绑定到 BossGroup中的一个线程,接收到的SocketChannel保存到WorkerGroup中的线程,之后所有操作这些Socket的线程只能是它绑定的那一个事件循环组中的线程,而对于其他线程而言,想要操作这些Channel,那么只能向绑定这些Channel的线程提交Task并放入这些线程的队列中,由这些线程来处理
- 那么我们很容易得出处理流程:BossGroup中的某个线程唯一绑定了我们上述例子的 NioServerSocketChannel 的实例,当接收到客户端连接时,将会从ServerSocketChannel中创建出SocketChannel对象,那么考虑下,我能不能在ServerSocketChannel处理时对该通道做一些处理呢?当SocketChannel对象绑定到Worker Group 中的线程后,我们将可以读取客户端的数据,而我们不可能让用户直接处理这些来自客户端的原始数据(Java SE 的网络模块了解的同学应该知道:从SocketChannel中获取到的数据将是TCP/UDP 运输层的数据,不涉及到任何应用层协议)那么,我们就需要设计一个架构来处理这些数据,同时让使用Netty的开发者在Netty自带的应用层协议的处理下,轻松愉快的专注于业务开发,而不是对这些数据进行编码(响应客户端数据时)和解码(接收客户端数据时)
- 那么问题就很明显了:使用某种已经存在的设计模式,来完成对通道的原始数据(运输层)处理,同时也可以提供高度灵活的插拔模式让用户自定义自己的处理器来完成数据的处理,最关键的就是 :让使用Netty的业务方脱离出对数据本身协议的处理:HTTP、PROTOBUF等等
这时,我们很很容易的想到:责任链模式。我们可以将这些通道处理器形成一条流水线,让接收到的客户端数据,如流水般依次流过这些处理器,这样我们就可以让处理器本身与Netty的数据处理完全解耦,并且Netty的应用方,只需要按照顺序定义好流水线即可,Netty也可以实现很多现成的通道处理器提供给业务方使用。比如上述例子的:HttpRequestDecoder(Http协议解码器)、HttpResponseEncoder(Http协议编码器)、HttpObjectAggregator(Http协议聚合器),而对于业务方自身的业务Handler只需要放在编码器和解码器的中间,接收来自上游处理器已经解码的数据即可,处理完成后将数据发送给下游,下游将自动完成Http协议响应数据的封装,这样就完成了我们的需求目标。
实现架构
Netty中使用ChannelPipeline接口来定义Handler流水线,使用ChannelHandler接口来定义通道处理器,ChannelPipeline负责组合一系列的ChannelHandler实例,这些ChannelHandler实例共同完成对通道的输入数据处理。这时,我们不难发现有如下问题:
- 通道处理链我们可以使用链表来处理,那么由谁来维护链表呢?
- 是需要ChannelHandler接口的实现类来维护么?
- 调用链表中的下一个ChannelHandler的代码由谁来调用呢?
自然,我们可以想到,将这些维护链表的方法和属性抽取出来,使用包装模式将ChannelHandler包装到实际处理这些事情的类中,实现两者的解耦,使用Netty的开发人员无需知道Netty的实现细节,只需要实现ChannelHandler接口完成自身的业务处理即可。这时,Netty使用ChannelHandlerContext 接口来描述ChannelPipeline流水线链表的元数据信息,同时在其中实现了传递流水线处理数据的方法。
而对于ChannelHandler接口而言,我们知道数据的处理流向有两个:输入和输出,同样Netty将这两个不同流向的处理切分为两个子接口:ChannelInboundHandler、ChannelOutboundHandler,业务方可以根据需要处理的流向实现对应的接口完成自身的逻辑。
Netty 的整体实现如下图所示,inbound输入事件由Inbound Handler 输入处理器按照自底向上的方向处理。 Inbound Handler处理器通常处理上图底部的Netty 事件循环组I/O线程生成的输入数据,输入数据通常通过底层实际IO Socket的输入操作从远程对端读取得到,例如 SocketChannel.read(ByteBuffer) 方法将Socket中的输入数据读取到ByteBuffer缓冲区中,如果输入事件到了责任链顶部的Inbound Handler处理器仍未被处理,则会将其静默丢弃,或者在需要应用程序打开日志时将其记录下来。
outbound输出事件由Outbound Handler 输出处理器按照自顶向下的方向处理,如图右侧所示。 Outbound Handler处理器通常生成或者转换输出到底层Socket IO的报文,比如:HTTP和其他自定义协议。 如果outbound输出事件流转到了底部Outbound Handler处理器,则由与 Channel 对象关联的Netty 事件循环 I/O线程处理,该I/O线程通常执行实际的输出操作,比如调用:SocketChannel.write(ByteBuffer) 将ByteBuffer中的数据写入到底层SocketChannel中传输给对端。
例如我们可以定义如下责任链,在下面的例子中,名称以 Inbound 开头的类意味着它是一个Inbound Handler 输入处理器。名称以Outbound开头的类意味着它是一个Outbound Handler 输出处理器。 在给定的示例配置中,当读取数据事件发生时,处理顺序为:1、2、3、4、5。 当数据写出事件发生时,处理顺序为:5、4、 3 、2 、1。 在这个顺序上,ChannelPipeline流水线将会按照以下规则,来跳过对某些不必要的通道处理器来减少调用栈深度:
- 因为通道处理器3和4没有实现ChannelInboundHandler接口,因此一个读取数据事件的实际执行顺序为:1,2,5
- 因为通道处理器1和2没有实现ChannelOutboundHandler接口,因此一个写入数据事件的实际执行顺序为:5,4,3
- 因为通道处理器5同时实现 ChannelInboundHandler 和 ChannelOutboundHandler 接口,所以将会同时处理输入和输出事件
- 所以最终读取数据事件和写入数据事件分别为1、2、5和5、4、3
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
2
3
4
5
6
7
8
9
10
11
另外读者可能在图中看到:处理程序必须调用 ChannelHandlerContext 类中定义的事件传播方法(fireIN_EVT和OUT_EVT),将事件转发给它的下一个处理器。
I/O 请求,通过Channel对象或者ChannelHandlerContext对象调用
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ 方法调用 ] [ 方法调用 ] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty 事件循环线程完成对这些方法调用 |
+-------------------------------------------------------------------+
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
这下,整体实现明了了:底层Socket读取到数据后将由ChannelPipeline自动调用用户定义的 ChannelInboundHandler 实例依次完成数据的处理,当数据处理完成时,用户将通过Channel对象或者ChannelHandlerContext对象调用writeX的方法将数据写入到通道中,此时将由ChannelPipeline自动调用用户定义的ChannelOutboundHandler 实例依次完成数据的处理,最后将这些处理好的数据写入到底层Socket中。
那么按照我们上述描述的实现,那么必然存在一个头部处理器和尾部处理器。为何?读者想想:如果不存在这样的处理器,如何实现描述的定义:如果输入事件到了责任链顶部的Inbound Handler处理器仍未被处理,则会将其静默丢弃?输出事件到达最后需要执行实际的输出操作?这时,在Netty的DefaultChannelPipeline类中,存在如下两个特殊的ChannelHandler:HeadContext(处理输入事件和输出事件,输入事件将传递到链表中的第一个用户自定义的通道处理器,输出事件将调用Unsafe类完成实际通道的IO操作)。
// 处理器链头部上下文
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe; // 实现socket 底层的IO操作类
// 处理底层通道IO的读写操作
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
// 通道的输入事件发生时,自动调用处理器链中的下一个处理器
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
}
// 处理器链尾部上下文
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
// 所有输入事件,均不处理
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
// 捕捉到异常,或者输入事件的数据没有处理,那么调用onUnhandledInboundException方法,将资源释放并打印日志
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
// 释放资源msg,并打印debug日志
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
那么,我们继续思考:Netty 使用 ChannelInboundHandler 表示输入事件处理器,使用 ChannelOutboundHandler 表示输出事件处理器,那么这时我们需要实现一个特殊的处理器:编码和解码处理器,而对于这些处理器而言,它们包含公用的部分逻辑:输入缓冲区和输出缓冲区的操作,那么如何实现?在Netty中使用:ByteToMessageDecoder、MessageToMessageDecoder、MessageToMessageEncoder、MessageToByteEncoder 四个抽象类来表示这些编码和解码器。从字面上的意思很容易理解:ByteToMessageX 用于将字节数据转为对象。MessageToMessageX 用于将一个对象转为另一个对象。MessageToByteX 用于将对象转为字节数据。
细节分析
了解了上述的基本架构后,我们回到一开始的例子,来看看Netty内部对于例子中的通道处理器的具体实现。我们先来看handler(new LoggingHandler(LogLevel.INFO))。我们知道该handler将作为NioServerSocketChannel的处理器来完成对输入数据的处理,而LoggingHandler仅仅用于打印输入事件的日志。它将在哪里调用呢?通过源码我们知道,将会在ServerSocketChannel初始化时完成对该流水线的设置,同时我们看到一个特殊的ServerBootstrapAcceptor,它将负责接收来自客户端的连接,同时根据我们配置的HttpHelloWorldServerInitializer来完成流水线中的通道处理器定义。
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpHelloWorldServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
// 绑定端口
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
...
}
// 完成ServerSocketChannel的创建和初始化
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel(); // 将调用我们设置的NioServerSocketChannel完成对ServerSocketChannel通道的创建
init(channel); // 初始化通道
}
// 使用ServerBootstrap对通道处理进行初始化
void init(Channel channel) throws Exception {
...
p.addLast(new ChannelInitializer<Channel>() { // 设置ServerSocketChannel的通道处理器流水线
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) { // 将我们通过 .handler(new LoggingHandler(LogLevel.INFO)) 方法放入的日志处理器添加到流水线中
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
}); // 将ServerBootstrapAcceptor用于接收客户端连接的通道处理器放入流水线中
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
我们接着来看ServerBootstrapAcceptor的实现。可以看到它为一个ChannelInboundHandler的实例,专用于处理输入事件。当客户端连接到来,时将会回调该处理器。通过源码我们看到:当输入事件发生时,也即客户端连接时,将在这里负责配置客户端通道流水线对象,将我们自定义的通道处理器childHandler放入其中。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 客户端连接到来时,msg将表示SocketChannel对象
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
...
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
那么这时我们来看例子中使用的几个通道处理器的定义。不难看出:HttpRequestDecoder为输入事件处理器、HttpResponseEncoder为输出事件处理器、HttpObjectAggregator用于聚合Http头部和消息体同样属于输入事件处理器。那么ChannelInitializer呢?也是一个输入处理器。但是我们看到其他的编码和解码器定义在HttpHelloWorldServerInitializer的initChannel方法中。那么不难看出,ChannelInitializer将用于初始化SocketChannel的流水线对象。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {}
public class HttpRequestDecoder extends HttpObjectDecoder {}
public abstract class HttpObjectDecoder extends ByteToMessageDecoder {}
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {}
public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> {}
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {}
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {}
public class HttpObjectAggregator extends MessageAggregator<HttpObject, HttpMessage, HttpContent, FullHttpMessage> {}
public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends ByteBufHolder>
extends MessageToMessageDecoder<I> {}
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
那么最后一个问题来了:Netty何时处理ChannelInitializer呢?其实我们从定义都可以猜到了:由于它本身是一个输入事件处理器对象,肯定是某个输入事件触发了ChannelInitializer的initChannel方法。我们来看源码。通过源码很容易看到了:ChannelInitializer接收channelRegistered通道注册事件,在该事件中,将调用initChannel方法完成对通道流水线的初始化,完成后将自身从流水线中移除(功成身退)。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
protected abstract void initChannel(C ch) throws Exception; // 子类实现,完成通道初始化,其实就是流水线的初始化
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.pipeline().fireChannelRegistered();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
那么谁又触发了channelRegistered事件呢?通过源码,我们看到,当我们调用Channel ch = b.bind(PORT).sync().channel() 时,将会触发通道注册操作,当通道注册到事件循环组中的某个事件循环线程时,将会在其内部调用操作实际底层IO操作的Unsafe对象,在该对象中完成对DefaultChannelPipeline的fireChannelRegistered方法调用,此时将会由流水线对象完成该输入事件的调用,从而执行ChannelInitializer的channelRegistered方法。
public class DefaultChannelPipeline implements ChannelPipeline {
// 触发通道注册事件
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
// 高内聚,HeadContext和TailContext均为内部类
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
// 处理实际底层IO操作
protected abstract class AbstractUnsafe implements Unsafe {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
register0(promise);
...
}
}
private void register0(ChannelPromise promise) {
...
pipeline.fireChannelRegistered();
...
}
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
}
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
...
ChannelFuture regFuture = config().group().register(channel);
...
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112