# Netty 核心原理十五 DefaultPromise 原理
Promise 原理
该接口也是一个特殊的Future接口,用于对Future代表的异步任务结果操作。我们看到这里面的两大核心方法:设置成功、设置失败同时保存异常结果。读者需要注意的是:不可重复的设置结果,如果在多线程间设置执行结果,那么可以使用tryXXX方法,因为该方法将不会抛出IllegalStateException异常。源码描述如下。
public interface Promise<V> extends Future<V> {
/**
* 设置当前Future代表的异步任务执行结果完成,同时参数result用于设置结果。如果当前结果已经被设置完成,那么将会抛出无效参数异常
*/
Promise<V> setSuccess(V result);
/**
* 尝试设置当前Future代表的异步任务执行结果完成,同时参数result用于设置结果。如果当前任务已经被设置完成,那么返回false,否则返回true
*/
boolean trySuccess(V result);
/**
* 设置当前Future代表的异步任务执行失败,同时参数cause用于设置异常信息。如果当前结果已经被设置完成,那么将会抛出无效参数异常
*/
Promise<V> setFailure(Throwable cause);
/**
* 尝试设置当前Future代表的异步任务执行失败,同时参数cause用于设置异常信息。如果当前结果已经被设置完成,那么返回false,否则返回true
*/
boolean tryFailure(Throwable cause);
/**
* 用于设置该Future代表的异步任务在执行后,是否可以被取消
*/
boolean setUncancellable();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
DefaultPromise 原理
DefaultPromise 为Promise接口的默认实现。在其中完成了所有的接口的实现。方法较多,所以笔者将他们拆开方便读者浏览。
核心变量与构造器
通过变量定义我们得知:
- result异步执行结果由RESULT_UPDATER原子性更新,从而保证线程安全
- EventExecutor executor事件执行器用于通知监听器
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8)); // 最大监听器栈深度,在调用监听器时使用,用于限制调用
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER; // 原子性更新结果引用对象(JUC的基础?)
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS"); // 当执行结果为null时,设置的成功对象
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE"); // 用于标识当前执行的任务不能够被取消
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)")); // 用于在调用cancel方法时标识取消的占位对象
static {
// 初始化result属性原子操作对象
AtomicReferenceFieldUpdater<DefaultPromise, Object> updater =
PlatformDependent.newAtomicReferenceFieldUpdater(DefaultPromise.class, "result");
RESULT_UPDATER = updater == null ? AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class,
Object.class, "result") : updater;
}
private volatile Object result; // 代表执行结果
private final EventExecutor executor; // 用于通知监听器的事件执行器对象
private Object listeners; // 监听器对象
private short waiters; // 等待当前任务执行完成的线程数
private boolean notifyingListeners; // 用于标识当前正在通知监听器
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}
// 静态内部类。用于作为异常信息占位对象
private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
核心方法await原理
该方法用于在外部线程调用,响应中断的等待当前异步执行任务完成。流程如下:
- 如果任务已经完成,直接返回
- 如果线程被中断,那么抛出中断异常
- 检测死锁(自己等待自己)
- 在当前Promise对象上等待执行完成
// 用于检测死锁
protected void checkDeadLock() {
EventExecutor e = executor(); // 获取通过构造函数传入的执行器对象
if (e != null && e.inEventLoop()) { // 检测当前线程是否为事件执行器中的线程(自己等待自己?)
throw new BlockingOperationException(toString());
}
}
// 判断当前任务是否执行完成
public boolean isDone() {
return isDone0(result);
}
private static boolean isDone0(Object result) { // 结果不为空且不为不能取消的占位符
return result != null && result != UNCANCELLABLE;
}
public Promise<V> await() throws InterruptedException {
if (isDone()) { // 已经完成,直接返回
return this;
}
if (Thread.interrupted()) { // 线程被中断,那么抛出中断异常
throw new InterruptedException(toString());
}
checkDeadLock(); // 检测死锁
synchronized (this) { // 等待在当前Promise对象上
while (!isDone()) { // 仍为完成时等待,同时增加等待线程数
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
核心方法awaitUninterruptibly原理
该方法用于在外部线程调用,不响应中断的等待当前异步执行任务完成。流程如下:
- 如果任务已经完成,直接返回
- 检测死锁
- 在当前Promise对象上等待任务执行完成,如果等待过程中线程被中断,那么设置中断标识interrupted
- 在任务执行后,如果设置了interrupted中断标志位,那么调用Thread.currentThread().interrupt()方法重新设置线程标志位
public Promise<V> awaitUninterruptibly() {
if (isDone()) { // 已经完成,直接返回
return this;
}
checkDeadLock(); // 检测死锁
boolean interrupted = false; // 标志位用于标识线程在等待过程中被中断
synchronized (this) { // 等待在当前Promise对象上
while (!isDone()) {
incWaiters();
try {
wait();
} catch (InterruptedException e) { // 与await()不同,这里捕捉了中断异常同时设置了interrupted标志位
interrupted = true;
} finally {
decWaiters();
}
}
}
if (interrupted) { // 由于异常捕捉将会清除中断标志位,所以这里重新设置线程标志位
Thread.currentThread().interrupt();
}
return this;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
核心方法cancel原理
该方法用于取消异步执行的任务,参数mayInterruptIfRunning表示当任务正在执行时,是否通过中断停止执行。我们看到这里直接通过compareAndSet将result结果修改为CANCELLATION_CAUSE_HOLDER,如果修改成功,那么唤醒等待任务执行完成的线程,同时通知监听器。源码描述如下。
// 如果有等待线程那么唤醒它们
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { // CAS原子性更新
checkNotifyWaiters();
notifyListeners();
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
核心方法sync原理
该方法用于外部线程调用,响应中断异常的等待任务完成,如果任务发生了异常,那么将会把异常抛出。可以看到该方法首先使用前面介绍的await方法等待任务执行完成,然后检测如果导致当前任务执行失败的异常信息存在,那么将其抛出。源码描述如下。
// 如果导致当前任务执行失败的异常信息存在,那么将其抛出
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
public Promise<V> sync() throws InterruptedException {
await(); // 调用await等待完成
rethrowIfFailed();
return this;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
核心方法syncUninterruptibly原理
该方法同sync,只不过这里调用了awaitUninterruptibly而不是await。源码描述如下。
public Promise<V> syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
2
3
4
5
6
7
8
9
核心方法setSuccess原理
该方法用于设置成功执行的结果值。我们看到当result为null时设置SUCCESS作为结果。同时读者可以从源码中看到:如何设置任务不允许被取消?设置UNCANCELLABLE为result,同时在执行完成后将其替换为真实结果。源码描述如下。
// 根据结果值是否为null来选择是否使用静态变量:SUCCESS来设置执行结果
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
// 共用方法,用于原子性设置result结果值
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) || // 首先尝试原子性将null修改为objResult
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // 上一步失败后,有可能之前设置了UNCANCELLABLE标志位,表示不可取消,那么这里可以尝试将其替换为真实结果
checkNotifyWaiters(); // 成功后唤醒等待线程
return true;
}
return false;
}
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) { // 直接调用setSuccess0完成设置,成功后通知监听器
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this); // 如果设置失败,那么表明任务已经被其他线程设置完成,抛出异常
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
核心方法setFailure原理
该方法用于设置任务执行异常,但是已经完成。参数cause用于指明导致任务结束执行的异常信息。源码描述如下。
// 将异常信息包装为CauseHolder对象来设置完成
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners(); // 完成后通知监听器
return this;
}
throw new IllegalStateException("complete already: " + this, cause); // 重复设置抛出异常
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
核心方法trySuccess原理
该方法用于尝试设置任务的结果值,不过这里不会抛出IllegalStateException异常。可以看到流程一致,只不过在末尾不跑异常而是返回false。源码描述如下。
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
核心方法tryFailure原理
同trySuccess一样,在设置失败后返回false。源码描述如下。
public boolean tryFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
核心方法addListener原理
该方法用于向Future中添加监听器对象。流程如下:
- 检测监听器不能为空
- 对当前对象上锁保证线程安全,同时调用addListener0添加监听器
- 添加完成后如果任务已经执行完成,那么通知监听器
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) { // 监听器变量为空,那么将当前对象设置为listeners对象
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) { // 监听器对象为默认监听器,那么直接将监听器添加到其中的监听器数组中
((DefaultFutureListeners) listeners).add(listener);
} else { // 否则创建默认监听器对象,同时将当前监听器和目前需要添加的监听器放入其中
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener"); // 监听器不能为空
synchronized (this) { // 保证线程安全
addListener0(listener);
}
if (isDone()) { // 添加完成后如果任务已经执行完成,那么通知监听器
notifyListeners();
}
return this;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
上面我们看到了一个类DefaultFutureListeners,我们继续跟进看看该类的实现原理。通过源码我们可以看到DefaultFutureListeners使用数组来保存监听器,在数组容量不够时通过扩容2倍来存放新的监听器。源码描述如下。
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners; // 监听器数组
private int size; // 总的监听器个数
private int progressiveSize; // 获取任务执行进度的监听器个数
DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
// 创建监听器数组,同时将两个监听器放入0和1下标
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2; // 更新监听器长度
// 根据监听器类型是否为带进度查询的监听器类型来增加progressiveSize计数
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
// 向监听器数组添加监听器
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) { // 开辟2倍的数组来添加监听器
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
// 增加监听器计数
this.size = size + 1;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
// 返回监听器列表
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
核心方法notifyListeners原理
该方法我们在前面看到了,该方法用于通知所有监听器。流程如下:
- 获取事件执行器对象
- 如果当前线程属于事件执行器的执行线程,那么检测调用栈深后调用notifyListenersNow通知监听器
- 如果是外部线程调用,那么向事件执行器提交一个Runnable任务来完成通知
private void notifyListeners() {
EventExecutor executor = executor(); // 获取事件执行器
if (executor.inEventLoop()) { // 当前线程属于事件执行器的执行线程
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); // 获取TL中的当前线程执行监听器的栈深度
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) { // 如果当前线程通知监听器的次数小于设置的最大栈深,那么增加计数,并通知监听器
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally { // 调用完成后还原之前的栈深
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
// 如果超过最大调用深度,那么将不会继续调用监听器
}
// 如果是外部线程调用,那么需要通过Runnalbe的任务交给时间执行器线程来处理
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow(); // 注意这里将不会检测栈深,因为不是同一个线程
}
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
核心方法notifyListenersNow原理
该方法用于通知监听器。流程如下:
- 如果已经正在通知监听器,或者监听为空,那么直接返回,否则设置当前正在通知监听器,同时将全局监听器从成员变量中摘下放入局部变量
- 循环根据监听器类型来通知监听器,直到成员变量为空
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// 如果已经正在通知监听器,或者监听为空,那么直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
// 否则设置当前正在通知监听器,同时将全局监听器摘下
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) { // 循环通知监听器
if (listeners instanceof DefaultFutureListeners) { // 通知DefaultFutureListeners监听器
notifyListeners0((DefaultFutureListeners) listeners);
} else { // 否则通知GenericFutureListener监听器
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) { // 通知完毕后看看是否还存在监听器,如果不存在,那么直接返回
if (this.listeners == null) {
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
核心方法notifyListeners0原理
以下两个方法用于执行实际的通知操作。我们看到:
- 在DefaultFutureListeners中,将遍历其中监听器列表来通知监听器(使用notifyListener0(Future future, GenericFutureListener l)方法)
- 在GenericFutureListener中,将会回调监听器的operationComplete方法,同时捕捉了执行异常
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
核心方法notifyProgressiveListeners原理
该方法用于通知可以获取任务进度的监听器,参数progress表示当前任务执行进度,total表示当前任务总进度。流程如下:
- 获取进度监听器列表,若没有监听器,那么直接返回
- 判断当前线程属于事件执行器执行线程,如果不是,那么将流程包装为Runnable任务提交到事件执行器的任务队列,若是执行器线程,那么直接通知监听器
- 根据当前监听器列表数组类型调用对应方法完成通知(也即泛型是否为ProgressiveFuture),最终通知方法为notifyProgressiveListeners0
void notifyProgressiveListeners(final long progress, final long total) {
final Object listeners = progressiveListeners(); // 获取进度监听器列表
if (listeners == null) { // 没有监听器,直接返回
return;
}
final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
EventExecutor executor = executor();
if (executor.inEventLoop()) { // 当前线程属于事件执行器执行线程
// 根据当前监听器列表数组类型调用对应方法完成通知(泛型是否为ProgressiveFuture)
if (listeners instanceof GenericProgressiveFutureListener[]) {
notifyProgressiveListeners0(
self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total); // 通知GenericProgressiveFutureListener数组类型
} else {
notifyProgressiveListener0(
self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total); // 通知GenericProgressiveFutureListener<ProgressiveFuture<V>> 数组类型
}
} else { // 线程不属于事件执行器的执行线程,那么直接将上述流程包装为Runnable任务,由事件执行线程来执行,保证线程安全
if (listeners instanceof GenericProgressiveFutureListener[]) {
final GenericProgressiveFutureListener<?>[] array =
(GenericProgressiveFutureListener<?>[]) listeners;
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyProgressiveListeners0(self, array, progress, total);
}
});
} else {
final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
(GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyProgressiveListener0(self, l, progress, total);
}
});
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
核心方法progressiveListeners原理
该方法用于获取任务进度监听器。流程如下:
- 若当前Future的监听器列表对象为空,那么直接返回
- 若监听器列表对象为DefaultFutureListeners,那么将其中的监听器数组中的进度监听器复制到新的数组中并返回
- 若监听器列表对象为GenericProgressiveFutureListener监听器对象,那么直接返回
private synchronized Object progressiveListeners() {
Object listeners = this.listeners;
if (listeners == null) { // 当前监听器列表为空
return null;
}
if (listeners instanceof DefaultFutureListeners) { // 监听器列表对象为DefaultFutureListeners,那么将其中的监听器数组中的进度监听器复制到新的数组中并返回
DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
int progressiveSize = dfl.progressiveSize();
switch (progressiveSize) { // 这里优化了进度监听器为0和1的情况,这时不需要创建数组
case 0:
return null;
case 1:
for (GenericFutureListener<?> l: dfl.listeners()) {
if (l instanceof GenericProgressiveFutureListener) {
return l;
}
}
return null;
}
// 创建进度监听器数组,并遍历DefaultFutureListeners监听器列表,找到进度监听器将其放入数组中
GenericFutureListener<?>[] array = dfl.listeners();
GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
for (int i = 0, j = 0; j < progressiveSize; i ++) {
GenericFutureListener<?> l = array[i];
if (l instanceof GenericProgressiveFutureListener) {
copy[j ++] = (GenericProgressiveFutureListener<?>) l;
}
}
return copy;
} else if (listeners instanceof GenericProgressiveFutureListener) { // 若直接为GenericProgressiveFutureListener监听器对象,那么直接返回
return listeners;
} else { // 其他监听器类型将返回null
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
核心方法notifyProgressiveListener0原理
以下两个notifyProgressiveListeners0方法,根据监听器类型不一样进行回调进度监听器。从源码中很清晰的看到:对于 GenericProgressiveFutureListener<?>[] listeners
数组,那么遍历监听器列表调用notifyProgressiveListener0。而对于GenericProgressiveFutureListener l,那么直接在try catch中回调operationProgressed方法即可,通过try catch可以避免调用异常导致当前线程退出。源码描述如下。
private static void notifyProgressiveListeners0(
ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress, long total) {
for (GenericProgressiveFutureListener<?> l: listeners) { // 遍历调用
if (l == null) {
break;
}
notifyProgressiveListener0(future, l, progress, total);
}
}
private static void notifyProgressiveListener0(
ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
try {
l.operationProgressed(future, progress, total); // 单个监听器,直接调用
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()", t);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35