# Netty 核心原理七 SingleThreadEventExecutor 原理二

核心方法confirmShutdown实现原理

该方法用于确保事件处理器处于关闭状态。流程如下:

  1. 检测state状态是否处于 ST_SHUTTING_DOWN 正在关闭状态
  2. 检测当前调用该方法的线程不是正在执行该事件执行器的线程
  3. 取消所有周期性调度任务
  4. 更新gracefulShutdownStartTime关闭开始时间
  5. 执行所有任务队列中的任务成功或者执行钩子函数成功,那么检测当前状态是否已经处于关闭状态,如果处于关闭状态,那么直接返回true,因为此时执行器已经关闭,并且没有任何新任务需要执行。否则调用wakeup函数,唤醒线程并返回false
  6. 获取当前时间并检测状态state是否处于ST_SHUTTING_DOWN关闭状态,如果已经关闭,那么直接返回true。否则看看当前shutdown关闭的时间是否已经超过关闭超时时间,如果是,那么返回true
  7. 检测最后一次执行任务的时间小于设置的关闭静默期时间,那么这时调用wakeup唤醒线程继续执行任务,这里睡眠100ms,表示在静默期内每100ms检测是否有任务放入队列执行
  8. 如果在静默期内,没有任务放入队列执行,那么可以返回true,安全shutdown
protected boolean confirmShutdown() {

 if (!isShuttingDown()) { // 检测状态是否处于正在关闭状态:STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN

  return false;

 }

 if (!inEventLoop()) { // 当前调用该方法的线程不是正在执行该事件执行器的线程,那么抛出异常

  throw new IllegalStateException("must be invoked from an event loop");

 }

 cancelScheduledTasks(); // 取消所有周期性调度任务

 if (gracefulShutdownStartTime == 0) { // 更新关闭开始时间

  gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();

 }

 if (runAllTasks() || runShutdownHooks()) { // 执行所有任务队列中的任务成功或者执行钩子函数成功,那么检测当前状态是否已经处于关闭状态,如果处于关闭状态,那么直接返回true,因为此时执行器已经关闭,并且没有任何新任务需要执行

  if (isShutdown()) {

   return true;

  }

  // 当然,如果队列中仍有任务,那么等待一段时间,直到在静默期时间段内没有任务放入队列 

  wakeup(true);

  return false;

 }

 final long nanoTime = ScheduledFutureTask.nanoTime(); // 获取当前时间

 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { // 检测状态如果已经关闭,那么直接返回true,否则看看当前shutdown关闭的时间是否已经超过关闭超时时间,如果是,那么返回true

  return true;

 }

 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // 最后一次执行任务的时间小于设置的关闭静默期时间,那么这时调用wakeup唤醒线程继续执行任务,这里睡眠100ms,表示在静默期内每100ms检测是否有任务放入队列执行

  wakeup(true);

  try {

   Thread.sleep(100);

  } catch (InterruptedException e) {

   // Ignore

  }

  return false;

 }

 // 在静默期内没有任务放入队列,那么直接返回true,安全shutdown关闭

 return true;

}



// 取消所有周期性调度任务

protected void cancelScheduledTasks() {

 assert inEventLoop();

 // 获取周期性任务队列,如果队列为空,那么直接返回

 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; 

 if (isNullOrEmpty(scheduledTaskQueue)) {

  return;

 }

 // 将周期性任务队列中的任务转为数组,并遍历该数组将其取消执行

 final ScheduledFutureTask<?>[] scheduledTasks =

  scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);

 for (ScheduledFutureTask<?> task: scheduledTasks) { 

  task.cancelWithoutRemove(false);

 }

 scheduledTaskQueue.clear(); // 将周期性任务队列清空

}



// 唤醒线程函数,我们看到这里判断执行器状态后向队列放入唤醒线程的空任务,因为此时线程可能处于阻塞队列获取任务状态,通过向其中放入空任务,可以将线程唤醒

protected void wakeup(boolean inEventLoop) {

 if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {

  taskQueue.offer(WAKEUP_TASK);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

核心方法runAllTasks实现原理

该方法用于执行普通任务队列中的所有任务。流程如下:

  1. 将当前需要执行的周期性任务队列中的任务放入普通任务队列taskQueue
  2. 执行taskQueue中的所有任务,知道周期性任务队列和taskQueue为空
  3. 更新最终执行时间
  4. 调用子类实现的钩子函数afterRunningAllTasks
protected boolean runAllTasks() {

 assert inEventLoop();

 boolean fetchedAll;

 boolean ranAtLeastOne = false;

 do {

  fetchedAll = fetchFromScheduledTaskQueue(); 

  if (runAllTasksFrom(taskQueue)) {

   ranAtLeastOne = true;

  }

 } while (!fetchedAll); // 继续处理,直到我们获取所有的计划任务

 if (ranAtLeastOne) { // 更新最终执行时间

  lastExecutionTime = ScheduledFutureTask.nanoTime();

 }

 afterRunningAllTasks(); // 调用子类实现的钩子函数

 return ranAtLeastOne; 

}



// 将当前需要执行的周期性任务调度任务从周期性任务队列中移动到普通任务队列taskQueue

private boolean fetchFromScheduledTaskQueue() {

 long nanoTime = AbstractScheduledEventExecutor.nanoTime();

 Runnable scheduledTask = pollScheduledTask(nanoTime); // 获取当前已经到期的周期性任务

 while (scheduledTask != null) {

  if (!taskQueue.offer(scheduledTask)) { // 将其放入普通任务队列

   // 如果任务队列中没有剩余空间,将它放回scheduledTaskQueue,这样我们就可以等待普通任务队列有空间后再次提取它

   scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);

   return false;

  }

  scheduledTask = pollScheduledTask(nanoTime); // 获取下一个周期性任务

 }

 return true;

}



// 执行taskQueue中的所有任务

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {

 Runnable task = pollTaskFrom(taskQueue); // 从队列中提取任务

 if (task == null) {

  return false;

 }

 for (;;) {

  safeExecute(task); // 执行任务

  task = pollTaskFrom(taskQueue); // 提取下一个任务

  if (task == null) {

   return true;

  }

 }

}



// 使用try catch来预防发生异常导致线程退出

private static void safeExecute(Runnable task) {

 try {

  task.run();

 } catch (Throwable t) {

  logger.warn("A task raised an exception. Task: {}", task, t);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

核心方法runShutdownHooks实现原理

该方法用于执行所有注册的执行器关闭时执行的钩子函数。源码描述如下。

private boolean runShutdownHooks() {

 boolean ran = false;

 while (!shutdownHooks.isEmpty()) { // 执行所有钩子直到队列为空

  List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks); // 将当前所有钩子复制快照到当前copy引用,避免在执行中新增加hook

  shutdownHooks.clear(); // 清空队列

  for (Runnable task: copy) { // 遍历钩子队列中任务执行

   try {

    task.run();

   } catch (Throwable t) {

    logger.warn("Shutdown hook raised an exception.", t);

   } finally {

    ran = true;

   }

  }

 }

 if (ran) { // 执行成功更新执行器最后一次执行任务的时间

  lastExecutionTime = ScheduledFutureTask.nanoTime();

 }

 return ran;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39