# Netty 核心原理五 AbstractScheduledEventExecutor 原理

DefaultEventExecutorGroup 原理

由于绝大部分工作都在MultithreadEventExecutorGroup中实现了,那么可以看到该类仅仅只是在构造函数中对父类进行初始化,同时实现了唯一的newChild方法,在该方法中创建子执行器实例为DefaultEventExecutor对象。源码描述如下:

public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {



 public DefaultEventExecutorGroup(int nThreads) {

  this(nThreads, null);

 }



 // 构造函数:提供了默认的maxPendingTasks参数:SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS表示等待执行的任务个数

 public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {

  this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,

   RejectedExecutionHandlers.reject());

 }



 public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks,

         RejectedExecutionHandler rejectedHandler) {

  super(nThreads, threadFactory, maxPendingTasks, rejectedHandler);

 }



 // 通过父类的源码我们知道,传递的参数args(这里为maxPendingTasks)将会传递到该方法,由子执行器DefaultEventExecutor来使用。为何?父执行器组仅仅用于管理子执行器(创建、选择、销毁),而真实业务逻辑将在子执行器中完成

 protected EventExecutor newChild(Executor executor, Object... args) throws Exception {

  return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

AbstractEventExecutor 原理

我们看到该类同AbstractEventExecutorGroup一样,实现了一些默认的功能方法。这里需要注意:

  1. 在shutdownGracefully方法中提供了默认的静默期时间、关闭超时时间
  2. 提交执行方法由父类AbstractExecutorService完成
  3. 默认不支持周期性调度方法
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {



 static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; // 静默期

 static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; // 关闭超时时间



 private final EventExecutorGroup parent;

 private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);



 protected AbstractEventExecutor() {

  this(null);

 }



 protected AbstractEventExecutor(EventExecutorGroup parent) {

  this.parent = parent;

 }



 @Override

 public EventExecutorGroup parent() {

  return parent;

 }



 // 由于子执行器组只有他自己,所以永远返回他自己

 @Override

 public EventExecutor next() {

  return this;

 }



 // 判断当前线程是否为当前EventExecutor的执行线程,由子类来完成

 @Override

 public boolean inEventLoop() {

  return inEventLoop(Thread.currentThread());

 }



 @Override

 public Iterator<EventExecutor> iterator() {

  return selfCollection.iterator();

 }



 // 关闭方法由子类来完成,这里指定了默认的静默周期和关闭超时时间

 @Override

 public Future<?> shutdownGracefully() {

  return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);

 }



 

 /* ---- 异步执行创建默认的实例 */

 

 @Override

 public <V> Promise<V> newPromise() {

  return new DefaultPromise<V>(this);

 }



 @Override

 public <V> ProgressivePromise<V> newProgressivePromise() {

  return new DefaultProgressivePromise<V>(this);

 }



 @Override

 public <V> Future<V> newSucceededFuture(V result) {

  return new SucceededFuture<V>(this, result);

 }



 @Override

 public <V> Future<V> newFailedFuture(Throwable cause) {

  return new FailedFuture<V>(this, cause);

 }



 /* ---- 提交方法使用父类AbstractExecutorService来实现 */



 @Override

 public Future<?> submit(Runnable task) {

  return (Future<?>) super.submit(task);

 }



 @Override

 public <T> Future<T> submit(Runnable task, T result) {

  return (Future<T>) super.submit(task, result);

 }



 @Override

 public <T> Future<T> submit(Callable<T> task) {

  return (Future<T>) super.submit(task);

 }



 /* ---- 默认不支持周期性调度 */

 @Override

 public ScheduledFuture<?> schedule(Runnable command, long delay,

          TimeUnit unit) {

  throw new UnsupportedOperationException();

 }



 @Override

 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {

  throw new UnsupportedOperationException();

 }



 @Override

 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {

  throw new UnsupportedOperationException();

 }



 @Override

 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {

  throw new UnsupportedOperationException();

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211

AbstractScheduledEventExecutor 原理

该方法中定义了周期性任务队列scheduledTaskQueue和周期性任务调度功能方法的实现,用于对队列进行CRUD。同时提供ScheduledExecutorService中的函数实现。由于该类比较简单且功能单一,方法实现也不复杂,所以笔者这里这接将整个类都复制过来,然后进行了方法注释。读者这里只需要注意以下知识:

  1. 队列由PriorityQueue类实现
  2. 只有当前执行器所属线程才能对队列进行操作,这时保证线程安全(保证线程安全的两种方法:1、多线程操作上锁 2、单线程操作,其他线程将操作放入单线程的普通任务队列中,由单线程串行化实现,混沌学习下:Redis?)
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

 

 // 周期性任务调度队列

 Queue<ScheduledFutureTask<?>> scheduledTaskQueue;



 // 获取基于ScheduledFutureTask类加载时间的相对时间

 protected static long nanoTime() { 

  return ScheduledFutureTask.nanoTime();

 }



 // 获取或者初始化任务调度队列,可以看到这里的队列很简单:PriorityQueue由数组实现的小顶堆优先级队列

 Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {

  if (scheduledTaskQueue == null) {

   scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();

  }

  return scheduledTaskQueue;

 }



 // 取消所有周期性调度任务

 protected void cancelScheduledTasks() {

  Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;

  if (isNullOrEmpty(scheduledTaskQueue)) { // 队列为空,那么直接退出

   return;

  }

  final ScheduledFutureTask<?>[] scheduledTasks =

   scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]); // 导出所有周期性任务

  

  for (ScheduledFutureTask<?> task: scheduledTasks) { // 遍历取消这些任务

   task.cancelWithoutRemove(false);

  }

  scheduledTaskQueue.clear(); // 清空任务队列

 }



 // 从周期性任务队列中获取可以执行的周期性任务

 protected final Runnable pollScheduledTask(long nanoTime) {

  assert inEventLoop(); // 执行该方法的线程必须为当前执行器所属线程

  Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;

  // 从队列中看看是否存在任务,如果不存在,那么直接返回

  ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();

  if (scheduledTask == null) {

   return null;

  }

  // 否则看看周期周期性调度任务的等待时间是否小于传入的等待时间,如果小于那么获取该任务返回。注意:这里的remove不会导致阻塞

  if (scheduledTask.deadlineNanos() <= nanoTime) {

   scheduledTaskQueue.remove();

   return scheduledTask;

  }

  return null;

 }



 // 从队列中获取到下一次需要执行的周期性任务的等待时间

 protected final long nextScheduledTaskNano() {

  Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;

  // 获取第一个需要执行的任务(由于这里队列是小顶堆实现,所以第一个任务一定是最近一个需要执行的周期性任务),如任务不存在,那么返回-1,否则返回需要等待的时间

  ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); 

  if (scheduledTask == null) {

   return -1;

  }

  return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());

 }



 // 直接获取周期性任务队列中第一个需要执行的任务,不存在则返回null

 final ScheduledFutureTask<?> peekScheduledTask() {

  Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;

  if (scheduledTaskQueue == null) {

   return null;

  }

  return scheduledTaskQueue.peek();

 }



 // 查看任务队列中是否存在可执行的任务:任务存在且已经达到执行的时间

 protected final boolean hasScheduledTasks() {

  Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;

  ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();

  return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();

 }



 // 周期性调度实现,可以看到该方法对参数校验后直接调用schedule方法完成

 @Override

 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {

  ObjectUtil.checkNotNull(command, "command");

  ObjectUtil.checkNotNull(unit, "unit");

  if (delay < 0) {

   throw new IllegalArgumentException(

    String.format("delay: %d (expected: >= 0)", delay));

  }

  return schedule(new ScheduledFutureTask<Void>(

   this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));

 }



 // 对callable接口支持

 @Override

 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {

  ObjectUtil.checkNotNull(callable, "callable");

  ObjectUtil.checkNotNull(unit, "unit");

  if (delay < 0) {

   throw new IllegalArgumentException(

    String.format("delay: %d (expected: >= 0)", delay));

  }

  return schedule(new ScheduledFutureTask<V>(

   this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));

 }

 

 // 固定周期执行方法支持

 @Override

 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {

  ObjectUtil.checkNotNull(command, "command");

  ObjectUtil.checkNotNull(unit, "unit");

  if (initialDelay < 0) {

   throw new IllegalArgumentException(

    String.format("initialDelay: %d (expected: >= 0)", initialDelay));

  }

  if (period <= 0) {

   throw new IllegalArgumentException(

    String.format("period: %d (expected: > 0)", period));

  }



  return schedule(new ScheduledFutureTask<Void>(

   this, Executors.<Void>callable(command, null),

   ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));

 }



 // 固定周期执行方法支持

 @Override

 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {

  ObjectUtil.checkNotNull(command, "command");

  ObjectUtil.checkNotNull(unit, "unit");

  if (initialDelay < 0) {

   throw new IllegalArgumentException(

    String.format("initialDelay: %d (expected: >= 0)", initialDelay));

  }

  if (delay <= 0) {

   throw new IllegalArgumentException(

    String.format("delay: %d (expected: > 0)", delay));

  }



  return schedule(new ScheduledFutureTask<Void>(

   this, Executors.<Void>callable(command, null),

   ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));

 }



 // 完整实现周期性调度方法

 <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {

  if (inEventLoop()) { // 如果当前线程不在事件循环组,那么加入周期性任务队列(只能由当前执行器的线程来完成队列添加操作,这样就可以保证所有操作均由一个线程完成,此时保证了线程安全)

   scheduledTaskQueue().add(task);

  } else { // 否则直接调用execute方法执行(因为我们这里是Executor接口的实例,所以由线程执行的方法当然为Executor接口中定义的execute方法),在该方法中将任务放置到周期性任务队列

   execute(new Runnable() {

    @Override

    public void run() {

     scheduledTaskQueue().add(task);

    }

   });

  }

  return task;

 }



 // 将指定的任务task从周期性调度任务队列中移除,同样,这里只能有执行器所属的执行线程来完成该操作,保证线程安全

 final void removeScheduled(final ScheduledFutureTask<?> task) {

  if (inEventLoop()) {

   scheduledTaskQueue().remove(task);

  } else {

   execute(new Runnable() {

    @Override

    public void run() {

     removeScheduled(task);

    }

   });

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339