# RxJava 架构原理与设计思维二

RxJava 架构原理与设计思维一中我们说:

  1. reactivex中使用Observable来表示被观察者
  2. reactivex中使用Subject来表示桥接器,既可以表示被观察者又可以表示观察者
  3. reactivex中使用Scheduler来指定:异步事件的执行线程(SubscribeOn)、观察者回调线程(observeOn)

那么,本篇将分析RxJava 的基于reactivex规范定义的具体实现。读者这里需要拥有一个混沌视角:观察者模式和生产者消费者模型,对的就是这样。本内容暂时不涉及到源码分析,读者了解RxJava的设计理念和使用方式即可,后面我们将会以WebFlux作为源码研究对象,因为它用得足够广泛,且也是标准实现了Reactive-Streams 规范。

Hello World

我们先来看几个例子。以下代码以Flowable类包装了一个"Hello world"字符串,使它成为了一个可观察者,随后,我们调用其subscribe方法,传入lambda表达式,订阅该Observable,随后将在主线程中输出Hello world。

package rxjava.examples;

import io.reactivex.rxjava3.core.*;

public class HelloWorld {

  public static void main(String[] args) {

    Flowable.just("Hello world").subscribe(System.out::println);

 }

}
1
2
3
4
5
6
7
8
9
10
11
12
13

基础类

RxJava 3提供了几个基类,我们可以使用他们来构建可观察者:

  1. io.reactivex.rxjava3.core.Flowable:表示 0..N 个输入元素,支持 Reactive-Streams 规范(JDK9引入到Java中,形成了Java的Reactor编程的API,最初由Kaazing,Netflix,Pivotal,Red Hat,Twitter,Typesafe公司一起合作制定),支持背压(backpressure)
  2. io.reactivex.rxjava3.core.Observable:表示 0..N 个输入元素,用于支持(reactivex可观察者规范)不支持背压
  3. io.reactivex.rxjava3.core.Single:表示一个输入元素
  4. io.reactivex.rxjava3.core.Completable:没有任何输入元素,仅表示完成或者失败
  5. io.reactivex.rxjava3.core.Maybe:表示一个或者零个输入元素

我们可以看到,对于RxJava 3来说既实现了reactivex的规范,也实现了Reactive-Streams 规范,而对于WebFlux而言,就是基于Reactive-Streams 规范来实现的,RxJava早于Reactive-Streams 规范出现,所以在RxJava 2时兼容了Reactive-Streams 规范,没办法,这么多公司认定的规范,同时也被引入了JDK 9,所以必须支持对吧,不然脱离了API,那么将会被抛弃,因为写出的代码不具有可扩展性(想象一下:不面向JDBC使用数据库时将会发生什么)。而对于:Flowable、Observable、Single、Completable、Maybe而言,均为可观察者,他们的输出可以是 多个、一个、空,取决于定义。

RxJava 3中由于兼容了Reactive-Streams 规范,所以定义了如下表所示的类和接口:

  1. Type 列表示上述介绍的支持类型
  2. Class 列表示不同可观察者的基类
  3. Interface 列表示实现的接口类型
  4. Consumer 列表示当前类所支持的消费者基类

可以看到生产者类(可观察者)----消费者类(观察者)将配对出现:Flowable --- Subscriber(这两个类均为Reactive Streams API定义,所以没有接口定义中没有 FlowableSource)、Observable --- Observer、Single --- SingleObserver、Maybe --- MaybeObserver、Completable--- CompletableObserver。

img

Upstream, downstream

我们知道,对于函数式编程和响应式变成来说,操作的便是数据流:由源流入到目的地,而这时不难推理出:将出现上下游的概念。RxJava中的数据流由一个源(Flowable、Observable、Single、Completable、Maybe)、零个或多个中间操作、一个数据消费者组成,看如下代码。这里,假设我们把目光放在 operator2 操作符上,此时左边指向源的方向叫做上游(Upstream),右边的consumer称为下游。

source.operator1().operator2().operator3().subscribe(consumer);
1

Backpressure

当数据流通过异步处理时,每个处理步骤可能以不同的速度执行不同的事情,也即每个异步处理线程的处理速度不一样。为了避免由于这些异步处理步骤过多,导致由于处理速度跟不上产生数据的速度而增加了缓冲区,此时将会增加内存的使用率,而此时如果再继续向下游的处理线程发送数据,将会导致数据丢失或者出现异常。而所谓的背压,就是消费方可以通知生产方自身的处理情况,要求生产方暂缓发送数据,等待自身数据处理完成后再继续发送。如下图所示,背压(Backpressure)是一种重要的反馈机制,使得整个生产---消费系统得以优雅地响应负载,而不是在负载下崩溃,如果下游组件比较空闲,则可以向上游组件发出信号,请求获得更多的调用。

img

在RxJava中,Flowable类专门指定用于支持背压操作,而Observable类专用于非背压操作。其他类型:Single类、 Maybe类和Completable类不支持也不应该支持反压,因为它们的语义仅仅表示一个或者零个输入。

Assembly time

所谓的装配时间,就如同我们使用JDK 8 的Stream一样,处理数据流的中间操作。看如下代码,同Stream一样,没有最终消耗数据的操作,比如collect,此时将不会进行任何数据的消费,仅仅只是定义函数式编程的数据处理流。

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);
1
2
3

Subscription time

所谓订阅时间,就如同Stream的动作方法一样,比如collect,此时将会执行所有在Assembly time过程中定义的函数数据处理流程来处理可观察者(事件生产者)产生的数据。看如下代码,我们调用了subscribe订阅方法,此时将会导致Flowable生产数据,并调用数据处理流程,将处理结果回调System.out::println输出。

flow.subscribe(System.out::println)
1

Runtime

所谓运行时,则表示生产方通过emitter发布事件时的状态。如下面的代码,由于我们没有实现异步线程,所以emitter.onNext(time)和subscribe(System.out::println, Throwable::printStackTrace)均在同一个方法中执行,而这里的emitter.onNext(time)便表示Runtime状态。

Observable.create(emitter -> {

  while (!emitter.isDisposed()) { // emitter有效

    long time = System.currentTimeMillis();

    emitter.onNext(time); // 将当前时间发送给下游

    if (time % 2 != 0) { // 当当前时间不为偶数时,发送给下游错误异常信息,并结束

      emitter.onError(new IllegalStateException("Odd millisecond!"));

      break;

    }

  }

})

.subscribe(System.out::println, Throwable::printStackTrace); // 下游订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

Simple background computation

这里描述一个较为复杂的用例。RxJava的一个常见用例是在后台线程上运行一些耗时操作,如:IO操作,然后将IO操作结果放到UI线程上进行展示。我们说过,为了保证线程安全,通常我们会将一些重量级的操作放到一个单独的处理线程上执行,而不是在多线程中通过上锁的方式来操作,因为上锁也有性能损耗。

import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
  Thread.sleep(1000); // 表示一些耗时计算
  return "Done";
})
 .subscribeOn(Schedulers.io()) // 注意:一定要将计算操作通过该方法(RxJava 架构原理与设计思维一中介绍过)放到IO线程上执行
 .observeOn(Schedulers.single()) // 执行完毕后将订阅者的处理切换到single线程中执行(模拟UI线程)
 .subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- 当前线程等待Flowable处理结束
1
2
3
4
5
6
7
8
9

上述的编程风格称之为流式编程,非常符合函数式编程的编写方式,当然,我们也可以把它拆开编写,当然,你要不嫌麻烦的话。

Flowable<String> source = Flowable.fromCallable(() -> {
  Thread.sleep(1000); // 表示一些耗时计算
  return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
1
2
3
4
5
6
7
8

Schedulers

RxJava操作方法(map、flatmap、filter等数据操作)不直接与线程交互,就像在RxJava 架构原理与设计思维一中介绍的那样,而是通过方法设置调度器来进行线程切换。RxJava 3提供了几个可通过Schedulers工具类访问的标准调度器:

  1. Schedulers.computation():表示固定数量的线程,通常用于运行CPU密集的计算工作,大多数异步操作都将此作为它们的默认调度器
  2. Schedulers.io():表示执行IO操作的线程池
  3. Schedulers.single():表示一个拥有一个任务队列的单线程线程池
  4. Schedulers.trampoline():多个线程组成的线程池,以FIFO的方式运行放入的任务,通常用于测试目的

这些调度器在所有JVM平台上都可用,但在一些特定的平台上,如:Android、JFX、Swing,有它们自己定义的调度器:AndroidSchedulers.mainThread()、SwingScheduler.instance()、JavaFXSchedulers.gui(),均用于处理自身的UI操作。

Concurrency within a flow

前面介绍过,我们可以通过observeOn将操作放到线程池中异步执行,此时将不会阻塞当前线程,但是有时我们想让当前定义Flowable的线程来处理输出的数据,那么我们可以使用blockingSubscribe方法,此时当前定义Flowable的线程将负责处理System.out::println的输出行为。并且需要注意的是:map(v -> v * v)操作仅仅只是使用了Schedulers.computation()线程池中的一个线程来处理计算操作,并没有使用多个线程。

Flowable.range(1, 10)

 .observeOn(Schedulers.computation())

 .map(v -> v * v)

 .blockingSubscribe(System.out::println); // 主线程负责完成其中操作
1
2
3
4
5
6
7

Parallel processing

如果我们想要使用并行操作,那么可以这样编写代码,在flatMap中,将操作包装为一个新的Flowable,并调用subscribeOn表示将该Flowable的map操作放入到线程池中执行。如下所示。

Flowable.range(1, 10)

 .flatMap(v ->

   Flowable.just(v)

   .subscribeOn(Schedulers.computation())

   .map(w -> w * w)

 )

 .blockingSubscribe(System.out::println); // 主线程负责完成其中操作
1
2
3
4
5
6
7
8
9
10
11
12
13