# Netty 核心原理十 ThreadPerChannelEventLoopGroup原理

ThreadPerChannelEventLoopGroup 将会为每一个通道创建一个自己的EventLoop实例,我们看到这种特性就是BIO的特性,当然我们在后面描述oio时将会用到该类。读者注意:该类在生产线上慎用(通常不用),因为无法利用NIO的特性,一个通道一个线程将会导致系统性能极度下降。

核心变量与构造器

通过变量的定义,很容易的理解到以下几点:

  1. 该类对于每个通道都使用一个EventLoop对象来处理
  2. 由于通道数可能非常大,这时可以通过设置maxChannels来限制并发的通道数
  3. 当通道数达到最大时,将会抛出ChannelException tooManyChannels异常
  4. 我们创建activeChildren和idleChildren来缓存EventLoop对象,提升性能
public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {

 private final Object[] childArgs; // 传递给管理的EventLoop对象使用参数

 private final int maxChannels; // 最大通道数

 final Executor executor; // 执行器对象

 final Set<EventLoop> activeChildren =

  Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap()); // 管理活动的EventLoop对象

 final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>(); // 管理空闲的EventLoop对象

 private final ChannelException tooManyChannels; // 在通道数量超过最大通道数时抛出的异常对象



 private volatile boolean shuttingDown; // 是否处于 关闭中 状态

 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE); // 关闭结果Promise,调用方可以根据该对象来判断当前执行器组是否已经终止

 private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {

  @Override

  public void operationComplete(Future<Object> future) throws Exception {

   if (isTerminated()) {

    terminationFuture.trySuccess(null);

   }

  }

 }; // EventLoop对象终止后回调的监听器

 

 // 构造方法:设置maxChannels最大通道数为0,表示无限制

 protected ThreadPerChannelEventLoopGroup() {

  this(0);

 }

 

 // 构造方法:设置maxChannels最大通道数为maxChannels,并提供了默认工厂函数

 protected ThreadPerChannelEventLoopGroup(int maxChannels) {

  this(maxChannels, Executors.defaultThreadFactory());

 }

 

 // 构造方法:设置maxChannels最大通道数为maxChannels,并提供了默认执行器(每个任务创建一个线程来执行)

 protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {

  this(maxChannels, new ThreadPerTaskExecutor(threadFactory), args);

 }



 // 完整构造器:对参数进行初始化

 protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {

  if (maxChannels < 0) {

   throw new IllegalArgumentException(String.format(

    "maxChannels: %d (expected: >= 0)", maxChannels));

  }

  if (executor == null) {

   throw new NullPointerException("executor");

  }



  if (args == null) {

   childArgs = EmptyArrays.EMPTY_OBJECTS;

  } else {

   childArgs = args.clone();

  }



  this.maxChannels = maxChannels;

  this.executor = executor;



  tooManyChannels = ThrowableUtil.unknownStackTrace(

   new ChannelException("too many channels (max: " + maxChannels + ')'),

   ThreadPerChannelEventLoopGroup.class, "nextChild()");

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

核心方法newChild原理

该方法将创建管理的EventLoop对象。可以看到创建的类型为ThreadPerChannelEventLoop对象。源码描述如下。

protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {

 return new ThreadPerChannelEventLoop(this);

}
1
2
3
4
5

核心方法next原理

由于该类对于每个通道都由一个EventLoop对象来执行,所以这里的next方法将不支持操作。源码描述如下。

public EventLoop next() {

 throw new UnsupportedOperationException();

}
1
2
3
4
5

核心方法shutdownGracefully原理

该方法用于关闭事件循环组。流程如下:

  1. 设置shuttingDown为true,标识已经处于关闭中状态
  2. 关闭所有处于活动中的EventLoop
  3. 关闭所有空闲的EventLoop
  4. 如果此时状态已经变为终止状态,那么设置terminationFuture执行成功
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {

 shuttingDown = true; // 标识已经处于关闭中状态 

 for (EventLoop l: activeChildren) { // 关闭所有处于活动中的EventLoop

  l.shutdownGracefully(quietPeriod, timeout, unit);

 }

 for (EventLoop l: idleChildren) { // 关闭所有空闲的EventLoop

  l.shutdownGracefully(quietPeriod, timeout, unit);

 }



 // 如果此时状态已经变为终止状态,那么设置terminationFuture执行成功,那么此时调用方可以通过terminationFuture来获取终止状态

 if (isTerminated()) {

  terminationFuture.trySuccess(null);

 }

 return terminationFuture();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

核心方法awaitTermination原理

该方法用于超时等待事件循环组终止。流程如下:

  1. 计算超时时间
  2. 遍历所有活动的EventLoop,调用他们的awaitTermination
  3. 遍历所有空闲的EventLoop,调用他们的awaitTermination
public boolean awaitTermination(long timeout, TimeUnit unit)

 throws InterruptedException {

 long deadline = System.nanoTime() + unit.toNanos(timeout); // 计算超时时间

 for (EventLoop l: activeChildren) { // 遍历所有活动的EventLoop,调用他们的awaitTermination

  for (;;) {

   long timeLeft = deadline - System.nanoTime();

   if (timeLeft <= 0) { // 检查是否超时

    return isTerminated();

   }

   if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {

    break;

   }

  }

 }

 for (EventLoop l: idleChildren) { // 遍历所有空闲的EventLoop,调用他们的awaitTermination

  for (;;) {

   long timeLeft = deadline - System.nanoTime();

   if (timeLeft <= 0) { // 检查是否超时

    return isTerminated();

   }

   if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {

    break;

   }

  }

 }

 return isTerminated();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

核心方法register原理

该系列方法将把传递进来的Channel通道注册到管理的EventLoop中。我们看到核心方法就是nextChild,该方法将返回一个可用的EventLoop,然后调用其register方法完成注册。只不过区别如下:

  1. register(Channel channel)在注册时创建了一个DefaultChannelPromise对通道进行包装
  2. register(ChannelPromise promise)直接将ChannelPromise 传递给l.register方法
public ChannelFuture register(Channel channel) {

 if (channel == null) {

  throw new NullPointerException("channel");

 }

 try {

  EventLoop l = nextChild();

  return l.register(new DefaultChannelPromise(channel, l));

 } catch (Throwable t) { 

  return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);

 }

}



@Override

public ChannelFuture register(ChannelPromise promise) {

 try {

  return nextChild().register(promise);

 } catch (Throwable t) { 

  promise.setFailure(t);

  return promise;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

核心方法nextChild原理

上面我们看到该方法将会返回一个可用的EventLoop。流程如下:

  1. 判断状态是否已经处于关闭中,此时将抛出任务拒绝异常
  2. 尝试从空闲的队列中获取EventLoop。如果获取失败,那么判断是否达到最大通道数,如果达到,那么抛出tooManyChannels异常,否则创建一个新的EventLoop将其添加到activeChildren活动对象集中,并返回该EventLoop
private EventLoop nextChild() throws Exception {

 if (shuttingDown) {

  throw new RejectedExecutionException("shutting down");

 }



 EventLoop loop = idleChildren.poll();

 if (loop == null) { // 获取失败

  if (maxChannels > 0 && activeChildren.size() >= maxChannels) { // 检查是否达到最大通道限制

   throw tooManyChannels;

  }

  loop = newChild(childArgs); // 创建新的EventLoop

  loop.terminationFuture().addListener(childTerminationListener);

 }

 activeChildren.add(loop); // 添加到活动集

 return loop;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31