RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
RxJava 到底是什么
一个词:异步。
RxJava 好在哪
一个词:简洁。
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。
Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener
Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件
RxJava 有四个基本概念:
- Observable (可观察者,即被观察者)
- Observer (观察者)
- subscribe (订阅)
- 事件
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
1) 创建 Observer
Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的
1 | Observer<String> observer = new Observer<String>() { |
2) 创建 Observable
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:
1 | Observable<String> observable = Observable.create(emitter -> { |
3) Subscribe (订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。
1 | observable.subscribe(observer); |
1 | TestRxjavaActivity: onSubscribe: |
4) 场景示例
a. 打印字符串数组
将字符串数组 names 中的所有字符串依次打印出来:
1 | String[] names = new String[]{"qq", "ww", "ee"}; |
1 | TestRxjavaActivity: accept: qq |
b. 由 id 取得图片并显示
由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中
1 | Observable.create((ObservableOnSubscribe<Drawable>) emitter -> { |
线程控制 :Scheduler(一)
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
1 | Observable.create((ObservableOnSubscribe<Drawable>) emitter -> { |
1 | TestRxjavaActivity: onSubscribe: Thread[main,5,main] |
加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。
变换
- map(): 事件对象的直接变换,它是 RxJava 最常用的变换 ,是一对一的转化。
- flatMap():一对多的转化,flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。
- throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器
变换的原理:lift() [比较晦涩难懂,抽时间研究]
在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
compose: 对 Observable 整体的变换 [比较晦涩难懂,抽时间研究]
它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。
1 | public class LiftAllTransformer implements Observable.Transformer<Integer, String> { |
1 | Transformer liftAll = new LiftAllTransformer(); |
像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理
线程控制:Scheduler (二)
1 | Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 |
通过 observeOn() 的多次调用,程序实现了线程的多次切换。不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的,只有第一个subscribeOn() 起作用。
subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):
subscribeOn() 原理图:
observeOn() 原理图:
从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此observeOn() 控制的是它后面的线程。
RxJava 的适用场景和使用方式
1. 与 Retrofit 的结合
以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:
1 |
|
在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:
1 | getUser(userId, new Callback<User>() { |
而使用 RxJava 形式的 API,定义同样的请求是这样的:
1 |
|
1 | getUser(userId) |
拿到数据后,需要进行数据处理再显示,可以后台处理,切换线程:
1 | getUser(userId) |
接口嵌套调用:
1 |
|
1 | getToken() |
2. RxBinding
1 | RxView.clickEvents(button) |
3. 各种异步操作
前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。
4. RxBus
RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。