# Netty 核心原理十五 DefaultPromise 原理

Promise 原理

该接口也是一个特殊的Future接口,用于对Future代表的异步任务结果操作。我们看到这里面的两大核心方法:设置成功、设置失败同时保存异常结果。读者需要注意的是:不可重复的设置结果,如果在多线程间设置执行结果,那么可以使用tryXXX方法,因为该方法将不会抛出IllegalStateException异常。源码描述如下。

public interface Promise<V> extends Future<V> {



 /**

 * 设置当前Future代表的异步任务执行结果完成,同时参数result用于设置结果。如果当前结果已经被设置完成,那么将会抛出无效参数异常

 */

 Promise<V> setSuccess(V result);



 /**

 * 尝试设置当前Future代表的异步任务执行结果完成,同时参数result用于设置结果。如果当前任务已经被设置完成,那么返回false,否则返回true

 */

 boolean trySuccess(V result);



 /**

 * 设置当前Future代表的异步任务执行失败,同时参数cause用于设置异常信息。如果当前结果已经被设置完成,那么将会抛出无效参数异常

 */

 Promise<V> setFailure(Throwable cause);



 /**

 * 尝试设置当前Future代表的异步任务执行失败,同时参数cause用于设置异常信息。如果当前结果已经被设置完成,那么返回false,否则返回true

 */

 boolean tryFailure(Throwable cause);



 /**

 * 用于设置该Future代表的异步任务在执行后,是否可以被取消

 */

 boolean setUncancellable();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

DefaultPromise 原理

DefaultPromise 为Promise接口的默认实现。在其中完成了所有的接口的实现。方法较多,所以笔者将他们拆开方便读者浏览。

核心变量与构造器

通过变量定义我们得知:

  1. result异步执行结果由RESULT_UPDATER原子性更新,从而保证线程安全
  2. EventExecutor executor事件执行器用于通知监听器
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,               SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8)); // 最大监听器栈深度,在调用监听器时使用,用于限制调用

 

 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER; // 原子性更新结果引用对象(JUC的基础?)

 

 private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS"); // 当执行结果为null时,设置的成功对象

 

 private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE"); // 用于标识当前执行的任务不能够被取消

 

 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(

  new CancellationException(), DefaultPromise.class, "cancel(...)")); // 用于在调用cancel方法时标识取消的占位对象



 static {

  // 初始化result属性原子操作对象

  AtomicReferenceFieldUpdater<DefaultPromise, Object> updater =

   PlatformDependent.newAtomicReferenceFieldUpdater(DefaultPromise.class, "result");

  RESULT_UPDATER = updater == null ? AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class,

                     Object.class, "result") : updater;

 }



 private volatile Object result; // 代表执行结果

 private final EventExecutor executor; // 用于通知监听器的事件执行器对象

 private Object listeners; // 监听器对象

 private short waiters; // 等待当前任务执行完成的线程数

 private boolean notifyingListeners; // 用于标识当前正在通知监听器



 public DefaultPromise(EventExecutor executor) {

  this.executor = checkNotNull(executor, "executor");

 }

 

 // 静态内部类。用于作为异常信息占位对象

 private static final class CauseHolder {

  final Throwable cause;

  CauseHolder(Throwable cause) {

   this.cause = cause;

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

核心方法await原理

该方法用于在外部线程调用,响应中断的等待当前异步执行任务完成。流程如下:

  1. 如果任务已经完成,直接返回
  2. 如果线程被中断,那么抛出中断异常
  3. 检测死锁(自己等待自己)
  4. 在当前Promise对象上等待执行完成
// 用于检测死锁

protected void checkDeadLock() { 

 EventExecutor e = executor(); // 获取通过构造函数传入的执行器对象

 if (e != null && e.inEventLoop()) { // 检测当前线程是否为事件执行器中的线程(自己等待自己?)

  throw new BlockingOperationException(toString());

 }

}



// 判断当前任务是否执行完成

public boolean isDone() {

 return isDone0(result);

}



private static boolean isDone0(Object result) { // 结果不为空且不为不能取消的占位符

 return result != null && result != UNCANCELLABLE;

}



public Promise<V> await() throws InterruptedException {

 if (isDone()) { // 已经完成,直接返回

  return this;

 }

 if (Thread.interrupted()) { // 线程被中断,那么抛出中断异常

  throw new InterruptedException(toString());

 }

 checkDeadLock(); // 检测死锁

 synchronized (this) { // 等待在当前Promise对象上

  while (!isDone()) { // 仍为完成时等待,同时增加等待线程数

   incWaiters();

   try {

    wait();

   } finally {

    decWaiters();

   }

  }

 }

 return this;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

核心方法awaitUninterruptibly原理

该方法用于在外部线程调用,不响应中断的等待当前异步执行任务完成。流程如下:

  1. 如果任务已经完成,直接返回
  2. 检测死锁
  3. 在当前Promise对象上等待任务执行完成,如果等待过程中线程被中断,那么设置中断标识interrupted
  4. 在任务执行后,如果设置了interrupted中断标志位,那么调用Thread.currentThread().interrupt()方法重新设置线程标志位
public Promise<V> awaitUninterruptibly() {

 if (isDone()) { // 已经完成,直接返回

  return this;

 }

 checkDeadLock(); // 检测死锁

 boolean interrupted = false; // 标志位用于标识线程在等待过程中被中断

 synchronized (this) { // 等待在当前Promise对象上

  while (!isDone()) {

   incWaiters();

   try {

    wait();

   } catch (InterruptedException e) { // 与await()不同,这里捕捉了中断异常同时设置了interrupted标志位

    interrupted = true;

   } finally {

    decWaiters();

   }

  }

 }

 if (interrupted) { // 由于异常捕捉将会清除中断标志位,所以这里重新设置线程标志位

  Thread.currentThread().interrupt();

 }

 return this;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

核心方法cancel原理

该方法用于取消异步执行的任务,参数mayInterruptIfRunning表示当任务正在执行时,是否通过中断停止执行。我们看到这里直接通过compareAndSet将result结果修改为CANCELLATION_CAUSE_HOLDER,如果修改成功,那么唤醒等待任务执行完成的线程,同时通知监听器。源码描述如下。

// 如果有等待线程那么唤醒它们

private synchronized void checkNotifyWaiters() {

 if (waiters > 0) {

  notifyAll();

 }

}



public boolean cancel(boolean mayInterruptIfRunning) {

 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { // CAS原子性更新

  checkNotifyWaiters();

  notifyListeners();

  return true;

 }

 return false;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

核心方法sync原理

该方法用于外部线程调用,响应中断异常的等待任务完成,如果任务发生了异常,那么将会把异常抛出。可以看到该方法首先使用前面介绍的await方法等待任务执行完成,然后检测如果导致当前任务执行失败的异常信息存在,那么将其抛出。源码描述如下。

// 如果导致当前任务执行失败的异常信息存在,那么将其抛出

private void rethrowIfFailed() {

 Throwable cause = cause();

 if (cause == null) {

  return;

 }

 PlatformDependent.throwException(cause);

}



public Promise<V> sync() throws InterruptedException {

 await(); // 调用await等待完成

 rethrowIfFailed();

 return this;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

核心方法syncUninterruptibly原理

该方法同sync,只不过这里调用了awaitUninterruptibly而不是await。源码描述如下。

public Promise<V> syncUninterruptibly() {

 awaitUninterruptibly();

 rethrowIfFailed();

 return this;

}
1
2
3
4
5
6
7
8
9

核心方法setSuccess原理

该方法用于设置成功执行的结果值。我们看到当result为null时设置SUCCESS作为结果。同时读者可以从源码中看到:如何设置任务不允许被取消?设置UNCANCELLABLE为result,同时在执行完成后将其替换为真实结果。源码描述如下。

// 根据结果值是否为null来选择是否使用静态变量:SUCCESS来设置执行结果

private boolean setSuccess0(V result) {

 return setValue0(result == null ? SUCCESS : result);

}



// 共用方法,用于原子性设置result结果值

private boolean setValue0(Object objResult) {

 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || // 首先尝试原子性将null修改为objResult

  RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // 上一步失败后,有可能之前设置了UNCANCELLABLE标志位,表示不可取消,那么这里可以尝试将其替换为真实结果

  checkNotifyWaiters(); // 成功后唤醒等待线程

  return true;

 }

 return false;

}



public Promise<V> setSuccess(V result) {

 if (setSuccess0(result)) { // 直接调用setSuccess0完成设置,成功后通知监听器

  notifyListeners();

  return this;

 }

 throw new IllegalStateException("complete already: " + this); // 如果设置失败,那么表明任务已经被其他线程设置完成,抛出异常

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

核心方法setFailure原理

该方法用于设置任务执行异常,但是已经完成。参数cause用于指明导致任务结束执行的异常信息。源码描述如下。

// 将异常信息包装为CauseHolder对象来设置完成

private boolean setFailure0(Throwable cause) {

 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));

}



public Promise<V> setFailure(Throwable cause) {

 if (setFailure0(cause)) {

  notifyListeners(); // 完成后通知监听器

  return this;

 }

 throw new IllegalStateException("complete already: " + this, cause); // 重复设置抛出异常

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

核心方法trySuccess原理

该方法用于尝试设置任务的结果值,不过这里不会抛出IllegalStateException异常。可以看到流程一致,只不过在末尾不跑异常而是返回false。源码描述如下。

public boolean trySuccess(V result) {

 if (setSuccess0(result)) {

  notifyListeners();

  return true;

 }

 return false;

}
1
2
3
4
5
6
7
8
9
10
11
12
13

核心方法tryFailure原理

同trySuccess一样,在设置失败后返回false。源码描述如下。

public boolean tryFailure(Throwable cause) {

 if (setFailure0(cause)) {

  notifyListeners();

  return true;

 }

 return false;

}
1
2
3
4
5
6
7
8
9
10
11
12
13

核心方法addListener原理

该方法用于向Future中添加监听器对象。流程如下:

  1. 检测监听器不能为空
  2. 对当前对象上锁保证线程安全,同时调用addListener0添加监听器
  3. 添加完成后如果任务已经执行完成,那么通知监听器
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {

 if (listeners == null) { // 监听器变量为空,那么将当前对象设置为listeners对象

  listeners = listener;

 } else if (listeners instanceof DefaultFutureListeners) { // 监听器对象为默认监听器,那么直接将监听器添加到其中的监听器数组中

  ((DefaultFutureListeners) listeners).add(listener);

 } else { // 否则创建默认监听器对象,同时将当前监听器和目前需要添加的监听器放入其中

  listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);

 }

}



public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {

 checkNotNull(listener, "listener"); // 监听器不能为空

 synchronized (this) { // 保证线程安全

  addListener0(listener);

 }

 if (isDone()) { // 添加完成后如果任务已经执行完成,那么通知监听器

  notifyListeners();

 }

 return this;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

上面我们看到了一个类DefaultFutureListeners,我们继续跟进看看该类的实现原理。通过源码我们可以看到DefaultFutureListeners使用数组来保存监听器,在数组容量不够时通过扩容2倍来存放新的监听器。源码描述如下。

final class DefaultFutureListeners {

 private GenericFutureListener<? extends Future<?>>[] listeners; // 监听器数组

 private int size; // 总的监听器个数

 private int progressiveSize; // 获取任务执行进度的监听器个数

 

 DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { 

  // 创建监听器数组,同时将两个监听器放入0和1下标

  listeners = new GenericFutureListener[2]; 

  listeners[0] = first;

  listeners[1] = second;

  size = 2; // 更新监听器长度

  // 根据监听器类型是否为带进度查询的监听器类型来增加progressiveSize计数

  if (first instanceof GenericProgressiveFutureListener) { 

   progressiveSize ++;

  }

  if (second instanceof GenericProgressiveFutureListener) {

   progressiveSize ++;

  }

 }



 // 向监听器数组添加监听器

 public void add(GenericFutureListener<? extends Future<?>> l) {

  GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;

  final int size = this.size;

  if (size == listeners.length) { // 开辟2倍的数组来添加监听器

   this.listeners = listeners = Arrays.copyOf(listeners, size << 1);

  }

  listeners[size] = l;

  // 增加监听器计数

  this.size = size + 1; 

  if (l instanceof GenericProgressiveFutureListener) {

   progressiveSize ++;

  }

 }

 

 // 返回监听器列表

 public GenericFutureListener<? extends Future<?>>[] listeners() {

  return listeners;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

核心方法notifyListeners原理

该方法我们在前面看到了,该方法用于通知所有监听器。流程如下:

  1. 获取事件执行器对象
  2. 如果当前线程属于事件执行器的执行线程,那么检测调用栈深后调用notifyListenersNow通知监听器
  3. 如果是外部线程调用,那么向事件执行器提交一个Runnable任务来完成通知
private void notifyListeners() {

 EventExecutor executor = executor(); // 获取事件执行器

 if (executor.inEventLoop()) { // 当前线程属于事件执行器的执行线程

  final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); // 获取TL中的当前线程执行监听器的栈深度

  final int stackDepth = threadLocals.futureListenerStackDepth();

  if (stackDepth < MAX_LISTENER_STACK_DEPTH) { // 如果当前线程通知监听器的次数小于设置的最大栈深,那么增加计数,并通知监听器

   threadLocals.setFutureListenerStackDepth(stackDepth + 1);

   try {

    notifyListenersNow();

   } finally { // 调用完成后还原之前的栈深

    threadLocals.setFutureListenerStackDepth(stackDepth);

   }

   return;

  }

  // 如果超过最大调用深度,那么将不会继续调用监听器

 }

 

 // 如果是外部线程调用,那么需要通过Runnalbe的任务交给时间执行器线程来处理

 safeExecute(executor, new Runnable() {

  @Override

  public void run() {

   notifyListenersNow(); // 注意这里将不会检测栈深,因为不是同一个线程

  }

 });

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

核心方法notifyListenersNow原理

该方法用于通知监听器。流程如下:

  1. 如果已经正在通知监听器,或者监听为空,那么直接返回,否则设置当前正在通知监听器,同时将全局监听器从成员变量中摘下放入局部变量
  2. 循环根据监听器类型来通知监听器,直到成员变量为空
private void notifyListenersNow() {

 Object listeners;

 synchronized (this) {

  // 如果已经正在通知监听器,或者监听为空,那么直接返回

  if (notifyingListeners || this.listeners == null) {

   return;

  }

  // 否则设置当前正在通知监听器,同时将全局监听器摘下

  notifyingListeners = true;

  listeners = this.listeners;

  this.listeners = null;

 }

 for (;;) { // 循环通知监听器

  if (listeners instanceof DefaultFutureListeners) { // 通知DefaultFutureListeners监听器

   notifyListeners0((DefaultFutureListeners) listeners); 

  } else { // 否则通知GenericFutureListener监听器

   notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);

  }

  synchronized (this) { // 通知完毕后看看是否还存在监听器,如果不存在,那么直接返回

   if (this.listeners == null) {

    notifyingListeners = false;

    return;

   }

   listeners = this.listeners;

   this.listeners = null;

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

核心方法notifyListeners0原理

以下两个方法用于执行实际的通知操作。我们看到:

  1. 在DefaultFutureListeners中,将遍历其中监听器列表来通知监听器(使用notifyListener0(Future future, GenericFutureListener l)方法)
  2. 在GenericFutureListener中,将会回调监听器的operationComplete方法,同时捕捉了执行异常
private void notifyListeners0(DefaultFutureListeners listeners) {

 GenericFutureListener<?>[] a = listeners.listeners();

 int size = listeners.size();

 for (int i = 0; i < size; i ++) {

  notifyListener0(this, a[i]);

 }

}



private static void notifyListener0(Future future, GenericFutureListener l) {

 try {

  l.operationComplete(future);

 } catch (Throwable t) {

  logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

核心方法notifyProgressiveListeners原理

该方法用于通知可以获取任务进度的监听器,参数progress表示当前任务执行进度,total表示当前任务总进度。流程如下:

  1. 获取进度监听器列表,若没有监听器,那么直接返回
  2. 判断当前线程属于事件执行器执行线程,如果不是,那么将流程包装为Runnable任务提交到事件执行器的任务队列,若是执行器线程,那么直接通知监听器
  3. 根据当前监听器列表数组类型调用对应方法完成通知(也即泛型是否为ProgressiveFuture),最终通知方法为notifyProgressiveListeners0
void notifyProgressiveListeners(final long progress, final long total) {

 final Object listeners = progressiveListeners(); // 获取进度监听器列表

 if (listeners == null) { // 没有监听器,直接返回

  return;

 }

 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;

 EventExecutor executor = executor();

 if (executor.inEventLoop()) { // 当前线程属于事件执行器执行线程

  // 根据当前监听器列表数组类型调用对应方法完成通知(泛型是否为ProgressiveFuture)

  if (listeners instanceof GenericProgressiveFutureListener[]) {

   notifyProgressiveListeners0(

    self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total); // 通知GenericProgressiveFutureListener数组类型

  } else {

   notifyProgressiveListener0(

    self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total); // 通知GenericProgressiveFutureListener<ProgressiveFuture<V>> 数组类型

  }

 } else { // 线程不属于事件执行器的执行线程,那么直接将上述流程包装为Runnable任务,由事件执行线程来执行,保证线程安全

  if (listeners instanceof GenericProgressiveFutureListener[]) {

   final GenericProgressiveFutureListener<?>[] array =

    (GenericProgressiveFutureListener<?>[]) listeners;

   safeExecute(executor, new Runnable() {

    @Override

    public void run() {

     notifyProgressiveListeners0(self, array, progress, total);

    }

   });

  } else {

   final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =

    (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;

   safeExecute(executor, new Runnable() {

    @Override

    public void run() {

     notifyProgressiveListener0(self, l, progress, total);

    }

   });

  }

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

核心方法progressiveListeners原理

该方法用于获取任务进度监听器。流程如下:

  1. 若当前Future的监听器列表对象为空,那么直接返回
  2. 若监听器列表对象为DefaultFutureListeners,那么将其中的监听器数组中的进度监听器复制到新的数组中并返回
  3. 若监听器列表对象为GenericProgressiveFutureListener监听器对象,那么直接返回
private synchronized Object progressiveListeners() {

 Object listeners = this.listeners;

 if (listeners == null) { // 当前监听器列表为空

  return null;

 }



 if (listeners instanceof DefaultFutureListeners) { // 监听器列表对象为DefaultFutureListeners,那么将其中的监听器数组中的进度监听器复制到新的数组中并返回

  DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;

  int progressiveSize = dfl.progressiveSize();

  switch (progressiveSize) { // 这里优化了进度监听器为0和1的情况,这时不需要创建数组

   case 0:

    return null;

   case 1:

    for (GenericFutureListener<?> l: dfl.listeners()) {

     if (l instanceof GenericProgressiveFutureListener) {

      return l;

     }

    }

    return null;

  }

  // 创建进度监听器数组,并遍历DefaultFutureListeners监听器列表,找到进度监听器将其放入数组中

  GenericFutureListener<?>[] array = dfl.listeners();

  GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];

  for (int i = 0, j = 0; j < progressiveSize; i ++) {

   GenericFutureListener<?> l = array[i];

   if (l instanceof GenericProgressiveFutureListener) {

    copy[j ++] = (GenericProgressiveFutureListener<?>) l;

   }

  }

  return copy;

 } else if (listeners instanceof GenericProgressiveFutureListener) { // 若直接为GenericProgressiveFutureListener监听器对象,那么直接返回

  return listeners;

 } else { // 其他监听器类型将返回null

  return null;

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

核心方法notifyProgressiveListener0原理

以下两个notifyProgressiveListeners0方法,根据监听器类型不一样进行回调进度监听器。从源码中很清晰的看到:对于 GenericProgressiveFutureListener<?>[] listeners数组,那么遍历监听器列表调用notifyProgressiveListener0。而对于GenericProgressiveFutureListener l,那么直接在try catch中回调operationProgressed方法即可,通过try catch可以避免调用异常导致当前线程退出。源码描述如下。

private static void notifyProgressiveListeners0(

 ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {

 for (GenericProgressiveFutureListener<?> l: listeners) { // 遍历调用

  if (l == null) {

   break;

  }

  notifyProgressiveListener0(future, l, progress, total);

 }

}



private static void notifyProgressiveListener0(

 ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) { 

 try {

  l.operationProgressed(future, progress, total); // 单个监听器,直接调用

 } catch (Throwable t) {

  logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35