# Netty 核心原理十三 NioEventLoop 原理二

核心方法rebuildSelector原理

该方法将在select方法中对Selector空转的bug进行处理,这里处理方式为:重新构建Selector对象。流程如下:

  1. 如果当前执行线程不是事件循环组的线程,那么向队列中提交任务来让执行线程执行重新构建操作
  2. 判断当前选择器不存在,那么直接返回
  3. 通过SelectorProvider创建一个新的选择器实例
  4. 将旧Selector中注册的通道注册到新的Selector中
public void rebuildSelector() {

 if (!inEventLoop()) {

  execute(new Runnable() {

   @Override

   public void run() {

    rebuildSelector();

   }

  });

  return;

 }

 

 final Selector oldSelector = selector;

 final Selector newSelector;



 if (oldSelector == null) { // 当前选择器不存在,那么直接返回

  return;

 }



 try {

  newSelector = openSelector(); // 通过SelectorProvider创建一个新的选择器实例

 } catch (Exception e) {

  logger.warn("Failed to create a new Selector.", e);

  return;

 }

 // 将旧Selector中注册的通道注册到新的Selector中

 int nChannels = 0;

 for (;;) {

  try {

   for (SelectionKey key: oldSelector.keys()) { // 遍历之前注册的通道选择集

    Object a = key.attachment(); // 获取与SelectionKey绑定的对象

    try {

     if (!key.isValid() || key.channel().keyFor(newSelector) != null) { // 避免重复注册

      continue;

     }

     int interestOps = key.interestOps(); // 获取之前注册的感兴趣事件集

     key.cancel(); // 将key从旧的选择器中取消

     SelectionKey newKey = key.channel().register(newSelector, interestOps, a); // 重新将通道注册到新的Selector中,注意这里需要携带:之前的感兴趣事件集、绑定对象

     if (a instanceof AbstractNioChannel) {

      // 如果绑定的对象是AbstractNioChannel实例,那么需要将其中保存的选择键更新

      ((AbstractNioChannel) a).selectionKey = newKey;

     }

     nChannels ++; 

    } catch (Exception e) { // 发生异常,关闭通道

     logger.warn("Failed to re-register a Channel to the new Selector.", e);

     if (a instanceof AbstractNioChannel) {

      AbstractNioChannel ch = (AbstractNioChannel) a;

      ch.unsafe().close(ch.unsafe().voidPromise());

     } else {

      @SuppressWarnings("unchecked")

      NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

      invokeChannelUnregistered(task, key, e);

     }

    }

   }

  } catch (ConcurrentModificationException e) {

   continue;

  }

  break;

 }

 selector = newSelector; // 更新选择器对象

 try {

  // 关闭旧的选择器

  oldSelector.close();

 } catch (Throwable t) {

  if (logger.isWarnEnabled()) {

   logger.warn("Failed to close the old Selector.", t);

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137

核心方法processSelectedKeys原理

该方法用于执行Selector中注册的通道发生的事件:可读、可写、新连接。可以看到这里如果使用了优化的选择集,那么调用processSelectedKeysOptimized方法执行,否则使用普通的processSelectedKeysPlain方法来执行准备好的事件。执行流程如下:

  1. 获取需要处理的SelectionKey,并将其从publicSelectedKeys中移除
  2. 根据携带对象的类型来选择处理,如果是AbstractNioChannel的子类那么直接调用方法processSelectedKey执行,否则先将携带对象转为NioTask再调用方法执行
  3. 如果设置了needsToSelectAgain标志位,那么在处理完当前选择键后需要再次选择

可以看到这里的优化操作主要是对原生的HashSet进行优化,这里直接操作数组索引下标,不需要计算hash,同时也不使用迭代器对象。源码描述如下。

private void processSelectedKeys() {

 if (selectedKeys != null) {

  processSelectedKeysOptimized(selectedKeys.flip());

 } else {

  processSelectedKeysPlain(selector.selectedKeys());

 }

}



// 使用优化的选择键集执行

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {

 for (int i = 0;; i ++) {

  final SelectionKey k = selectedKeys[i];

  if (k == null) { // 检测数组项是否为空

   break;

  }

  selectedKeys[i] = null; // 直接将数组项设置为空



  // 根据携带对象的类型来选择处理

  final Object a = k.attachment();

  if (a instanceof AbstractNioChannel) {

   processSelectedKey(k, (AbstractNioChannel) a);

  } else {

   @SuppressWarnings("unchecked")

   NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

   processSelectedKey(k, task);

  }



  // 设置需要再次选择,那么此时将原有的selectedKeys数组中当前i后面的selectedKey清空,然后执行再次选择,并将selectedKeys中的数组进行切换,此时将获得正确的selectedKeys数组并且重置index循环下标为-1,当开始循环时设置为0,表示重新从数组最开始执行

  if (needsToSelectAgain) {

   for (;;) {

    i++;

    if (selectedKeys[i] == null) {

     break;

    }

    selectedKeys[i] = null;

   }

   selectAgain();

   selectedKeys = this.selectedKeys.flip();

   i = -1;

  }

 }

}



// 使用普通的选择键集来完成处理

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {

 if (selectedKeys.isEmpty()) { // 键集为空,则返回

  return;

 }

 Iterator<SelectionKey> i = selectedKeys.iterator(); // 获取迭代器对象

 for (;;) {

  // 获取需要处理的SelectionKey,并将其从publicSelectedKeys中移除

  final SelectionKey k = i.next();

  final Object a = k.attachment();

  i.remove();



  if (a instanceof AbstractNioChannel) { // 处理AbstractNioChannel的子类

   processSelectedKey(k, (AbstractNioChannel) a);

  } else { // 处理NioTask

   NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

   processSelectedKey(k, task);

  }

  if (!i.hasNext()) { // 迭代器处理完成,那么结束循环

   break;

  }

  if (needsToSelectAgain) { // 如果设置了在处理完选择键后需要再次选择,然后继续执行,此时需要重新创建迭代器对象,避免发生ConcurrentModificationException异常

   selectAgain();

   selectedKeys = selector.selectedKeys();

   if (selectedKeys.isEmpty()) {

    break;

   } else {

    i = selectedKeys.iterator();

   }

  }

 }

}



// 重新对选择器进行选择

private void selectAgain() {

 needsToSelectAgain = false; // 还原标识变量

 try {

  selector.selectNow();

 } catch (Throwable t) {

  logger.warn("Failed to update SelectionKeys.", t);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169

核心方法processSelectedKey(SelectionKey k, AbstractNioChannel ch)原理

该方法用于处理SelectionKey 和其携带的AbstractNioChannel 子类对象。流程如下:

  1. 获取通道NioUnsafe对象
  2. 判断选择键无效而关闭通道
  3. 获取准备好的事件集
  4. 根据事件集的类型:OP_READ、OP_ACCEPT、OP_WRITE、OP_CONNECT来调用Channel的相应方法来完成处理

我们这里可以看见:所有处理方法将由Channel来完成实现。源码描述如下。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 获取通道NioUnsafe对象

 if (!k.isValid()) { // 选择键无效(选择键被cancel、channel关闭、Selector被关闭)

  final EventLoop eventLoop;

  try {

   eventLoop = ch.eventLoop(); 

  } catch (Throwable ignored) { // Channel的实现没有使用事件循环组则返回,如果没有实现该方法,那么可能抛出异常,这里我们不做处理,直接返回

   return;

  }

  if (eventLoop != this || eventLoop == null) { // Channel已经从事件循环组中解除注册,那么直接返回

   return;

  }

  unsafe.close(unsafe.voidPromise()); // 通道仍然注册在事件循环组中,那么将其关闭

  return;

 }



 try {

  int readyOps = k.readyOps(); // 获取准备好的事件集

  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 如果为OP_READ和OP_ACCEPT则处理读操作,这里同样对readyOps=0进行判断,是为了避免发生JDK的事件集空返回Bug(其实这可能是操作系统的Bug:在部分Linux的2.6的kernel中,poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒)

   unsafe.read(); // 处理读操作

   if (!ch.isOpen()) { // 通道已经关闭直接返回

    return;

   }

  }

  if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 执行通道写操作

   ch.unsafe().forceFlush();

  }

  if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 执行连接操作

   int ops = k.interestOps();

   ops &= ~SelectionKey.OP_CONNECT; // 这里去掉了选择集中的OP_CONNECT,因为如果不去掉,那么将会总是返回该事件集

   k.interestOps(ops);

   unsafe.finishConnect();

  }

 } catch (CancelledKeyException ignored) {

  unsafe.close(unsafe.voidPromise());

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

核心方法processSelectedKey(SelectionKey k, NioTask<SelectableChannel>task)原理

该方法用于处理SelectionKey 和其携带的NioTask对象。可以看到该方法主要回调task的channelReady方法来完成对通道的处理。同时根据处理结果来完成进一步的处理。源码描述如下。

private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {

 int state = 0; // 标志执行状态:1:task回调方法正常执行 2、执行发生异常

 try {

  task.channelReady(k.channel(), k); // 回调task方法

  state = 1;

 } catch (Exception e) { // 发生异常将选择键删除同时回调任务的channelUnregistered方法

  k.cancel();

  invokeChannelUnregistered(task, k, e);

  state = 2;

 } finally {

  switch (state) {

   case 0: // 发生非Exception异常,那么取消选择键同时回调任务的channelUnregistered方法

    k.cancel();

    invokeChannelUnregistered(task, k, null);

    break;

   case 1: // 正常状态,那么验证选择键是否仍然有效

    if (!k.isValid()) {

     invokeChannelUnregistered(task, k, null);

    }

    break;

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45