# Netty 核心原理八 SingleThreadEventExecutor 原理三

核心方法awaitTermination实现原理

该方法用于外部线程等待当前执行器关闭,timeout指定等待超时时间。源码描述如下。

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {

 if (unit == null) {

  throw new NullPointerException("unit");

 }

 if (inEventLoop()) { // 不能自己等待自己完成(也即当前线程是当前事件执行器的执行线程)

  throw new IllegalStateException("cannot await termination of the current thread");

 }

 if (threadLock.tryAcquire(timeout, unit)) { // 通过线程信号量完成等待(Semaphore threadLock = new Semaphore(0),由于信号量初始化为0,那么当执行线程调用release时将会唤醒等待线程)

  threadLock.release(); // 成功后需要释放信号量,此时如果还有别的线程正在等待信号量,那么将会被唤醒

 }

 return isTerminated(); // 检测是否处于ST_TERMINATED终止状态(我们在前面doStartThread方法中看到:将会在finally块中将state转变为ST_TERMINATED)

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

核心方法takeTask实现原理

可以看到该方法的权限修饰符为protected,所以必然是子类进行调用,通过方法名也很容易知道该方法的作用:从任务队列中获取待执行的任务。流程如下:

  1. 检测任务队列必须为阻塞队列,因为保证线程安全,同时可以在没有任务可获取时阻塞等待任务
  2. 调用peekScheduledTask方法,查看周期性任务调度队列的第一个待执行任务
  3. 如果周期性任务不存在,那么从普通任务队列中获取任务执行。此时调用taskQueue.take(),所以当不存在任务时会阻塞。如果存在任务,那么如果任务为唤醒任务,那么直接设置为空返回即可
  4. 如果周期性任务存在,那么获取等待时间。如果等待时间大于0,那么此时在等待期间去普通任务队列里获取任务执行。否则,此时等待时间小于等于0,表明周期性任务已经可以执行了,那么此时将周期性任务调度队列中已经达到执行时间的任务放入普通任务队列,然后从队列中获取任务,然后返回
protected Runnable takeTask() {

 assert inEventLoop();

 if (!(taskQueue instanceof BlockingQueue)) { // 任务队列必须为阻塞队列,因为保证线程安全,同时可以在没有任务可获取时阻塞等待任务

  throw new UnsupportedOperationException();

 }

 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;

 for (;;) {

  // 查看周期性任务调度队列的第一个待执行任务

  ScheduledFutureTask<?> scheduledTask = peekScheduledTask();

  if (scheduledTask == null) { // 如果周期性任务不存在,那么从普通任务队列中获取任务执行

   Runnable task = null;

   try {

    task = taskQueue.take(); // 获取普通任务队列中的任务,我们看到这里为take方法,所以当不存在任务时会阻塞

    if (task == WAKEUP_TASK) { // 如果任务为唤醒任务,那么直接设置为空返回即可

     task = null;

    }

   } catch (InterruptedException e) {

    // Ignore

   }

   return task;

  } else { // 周期性任务存在,那么获取等待时间

   long delayNanos = scheduledTask.delayNanos();

   Runnable task = null;

   if (delayNanos > 0) { // 等待时间大于0,那么此时在等待期间去普通任务队列里获取任务执行

    try {

     task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);

    } catch (InterruptedException e) {

     return null;

    }

   }

   if (task == null) { // 如果此时任务为空,表明在等待的时间段内,没有新任务放入或者周期性任务已经可以执行。那么此时将周期性任务调度队列中已经达到执行时间的任务放入普通任务队列,然后从队列中获取放入的任务执行(注意:这里是netty修复的一个bug,读者可以考虑下如果不移动周期性任务到普通任务队列,那么当taskqueue中总是存在一个任务执行时,周期性任务将永远得不到执行)

    /** 这时没修复前的逻辑,可以看到不管delayNanos的值如何,这里都将从taskQueue队列中获取任务,如果该队列中始终存在一个任务,那么这个达到执行时间的周期性任务将不会得到执行

    Runnable task;

    if (delayNanos > 0) {

     try {

      task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);

     } catch (InterruptedException e) {

      // Waken up.

      return null;

     }

    } else { // 这里是关键(必须去掉,因为此时delayNanos<=0,已经可以执行了)

     task = taskQueue.poll();

    }

    if (task == null) {

     fetchFromScheduledTaskQueue();

     task = taskQueue.poll();

    }

    **/

    fetchFromScheduledTaskQueue();

    task = taskQueue.poll();

   }

   if (task != null) { // 任务存在,那么直接返回

    return task;

   }

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

核心方法shutdownGracefully实现原理

shutdownGracefully方法用于关闭事件执行器。当然,我们知道事件执行器最终是通过封装Runnable对象放入到Executor中执行,这时Executor将启动一个线程来执行事件执行器中的run方法。我们来看看该方法如何关闭执行器,流程如下:

  1. 对静默期持续时间、等待关闭超时时间、超时时间单位进行参数校验
  2. 如果已经处于关闭中状态,那么直接返回terminationFuture
  3. 循环等待状态变为关闭中状态。在循环中执行以下操作:
  4. 判断已经处于关闭中,那么返回terminationFuture
  5. 如果当前执行器线程为事件执行线程,那么直接修改newState状态为ST_SHUTTING_DOWN,否则根据当前状态来修改,可以看到此时如果处于ST_NOT_STARTED(未启动)、ST_STARTED(已经启动)那么直接修改为ST_SHUTTING_DOWN,其他状态均不变
  6. 原子性更新状态
  7. 将静默期,超时时间转为实例变量
  8. 如果之前的状态如果为未启动状态,那么这里启动线程处理关闭状态
  9. 如果线程已经处于关闭状态,那么不需要唤醒,其他时候需要唤醒线程来处理关闭状态
  10. 返回terminationFuture
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {

 // 参数校验

 if (quietPeriod < 0) {

  throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");

 }

 if (timeout < quietPeriod) {

  throw new IllegalArgumentException(

   "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");

 }

 if (unit == null) {

  throw new NullPointerException("unit");

 }

 // 已经处于关闭中状态,那么直接返回terminationFuture,调用方可以使用该Future来等待状态转为TS_TERMINATE

 if (isShuttingDown()) {

  return terminationFuture();

 }

 boolean inEventLoop = inEventLoop(); // 判断当前调用线程是否为事件执行器的执行线程

 boolean wakeup;

 int oldState;

 for (;;) { // 循环等待关闭

  if (isShuttingDown()) { // 已经处于关闭中,那么返回Future

   return terminationFuture();

  }

  int newState;

  wakeup = true;

  oldState = STATE_UPDATER.get(this); // 获取当前状态

  if (inEventLoop) { // 如果当前执行器线程为事件执行线程,那么直接修改newState状态为ST_SHUTTING_DOWN表明处于关闭状态(注意:这里能直接修改,因为当前为执行线程,既然执行到该方法,那么肯定当前状态为ST_STARTED,所以直接修改即可)

   newState = ST_SHUTTING_DOWN;

  } else {

   // 否则根据当前状态来修改,可以看到此时如果处于ST_NOT_STARTED(未启动)、ST_STARTED(已经启动)那么直接修改为ST_SHUTTING_DOWN,其他状态均不变(读者可以想想为何?如果状态已经处于关闭中、关闭、终止,那么不需要修改)

   switch (oldState) {

    case ST_NOT_STARTED:

    case ST_STARTED:

     newState = ST_SHUTTING_DOWN;

     break;

    default:

     newState = oldState;

     wakeup = false;

   }

  }

  if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { // 原子性更新状态

   break;

  }

 }

 // 将静默期,超时时间转为实例变量

 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);

 gracefulShutdownTimeout = unit.toNanos(timeout);

 if (oldState == ST_NOT_STARTED) { // 之前的状态如果为未启动状态,那么这里启动线程

  doStartThread();

 }

 if (wakeup) { // 如果线程已经处于关闭状态,那么不需要唤醒,其他时候需要唤醒线程来处理关闭状态

  wakeup(inEventLoop);

 }

 return terminationFuture();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109