RxJava知识点记录(扔物线)

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。


RxJava 到底是什么

一个词:异步。

RxJava 好在哪

一个词:简洁。

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。

Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener

Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件


RxJava 有四个基本概念:

  • Observable (可观察者,即被观察者)
  • Observer (观察者)
  • subscribe (订阅)
  • 事件

Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。


1) 创建 Observer

Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onNext(@NonNull String s) {
Log.d(TAG, "onNext: " + s);
}

@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};

2) 创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

1
2
3
4
5
6
7
Observable<String> observable = Observable.create(emitter -> {
Log.d(TAG, "subscribe: ");
emitter.onNext("emitter send data --- 1");
emitter.onNext("emitter send data --- 2");
emitter.onNext("emitter send data --- 3");
emitter.onComplete();
});

3) Subscribe (订阅)

创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。

1
observable.subscribe(observer);

1
2
3
4
5
6
TestRxjavaActivity: onSubscribe: 
TestRxjavaActivity: subscribe:
TestRxjavaActivity: onNext: emitter send data --- 1
TestRxjavaActivity: onNext: emitter send data --- 2
TestRxjavaActivity: onNext: emitter send data --- 3
TestRxjavaActivity: onComplete:

4) 场景示例

a. 打印字符串数组

将字符串数组 names 中的所有字符串依次打印出来:

1
2
3
4
String[] names = new String[]{"qq", "ww", "ee"};
Disposable disposable = Observable.fromArray(names).subscribe(s ->
Log.d(TAG, "accept: " + s)
);

1
2
3
TestRxjavaActivity: accept: qq
TestRxjavaActivity: accept: ww
TestRxjavaActivity: accept: ee
b. 由 id 取得图片并显示

由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.create((ObservableOnSubscribe<Drawable>) emitter -> {
Drawable drawable = getDrawable(R.mipmap.ic_launcher);
emitter.onNext(drawable);
emitter.onComplete();
}).subscribe(new Observer<Drawable>() {
@Override
public void onSubscribe(@NonNull Disposable d) { }
@Override
public void onNext(@NonNull Drawable drawable) {
((ImageView) (findViewById(R.id.imageView))).setImageDrawable(drawable);
}
@Override
public void onError(@NonNull Throwable e) { }
@Override
public void onComplete() { }
});






线程控制 :Scheduler(一)

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.create((ObservableOnSubscribe<Drawable>) emitter -> {
Log.d(TAG, "subscribe: " + Thread.currentThread());
Drawable drawable = getDrawable(R.mipmap.ic_launcher);
emitter.onNext(drawable);
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: " + Thread.currentThread());
}

@Override
public void onNext(@NonNull Drawable drawable) {
Log.d(TAG, "onNext: " + Thread.currentThread());
((ImageView) (findViewById(R.id.imageView))).setImageDrawable(drawable);
}

@Override
public void onError(@NonNull Throwable e) {
}

@Override
public void onComplete() {
}
});

1
2
3
TestRxjavaActivity: onSubscribe: Thread[main,5,main]
TestRxjavaActivity: subscribe: Thread[RxCachedThreadScheduler-1,5,main]
TestRxjavaActivity: onNext: Thread[main,5,main]

加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

变换

  • map(): 事件对象的直接变换,它是 RxJava 最常用的变换 ,是一对一的转化。
  • flatMap():一对多的转化,flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

  • throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器

变换的原理:lift() [比较晦涩难懂,抽时间研究]

在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

compose: 对 Observable 整体的变换 [比较晦涩难懂,抽时间研究]

它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。

1
2
3
4
5
6
7
8
9
10
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}

1
2
3
4
5
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理

线程控制:Scheduler (二)

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定

通过 observeOn() 的多次调用,程序实现了线程的多次切换。不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的,只有第一个subscribeOn() 起作用。

subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):

subscribeOn() 原理图:

observeOn() 原理图:

从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此observeOn() 控制的是它后面的线程。


RxJava 的适用场景和使用方式

1. 与 Retrofit 的结合

以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:

1
2
@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);

在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:

1
2
3
4
5
6
7
8
9
10
11
12
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
userView.setUser(user);
}

@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};

而使用 RxJava 形式的 API,定义同样的请求是这样的:

1
2
@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
getUser(userId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable error) {
// Error handling
...
}
});

拿到数据后,需要进行数据处理再显示,可以后台处理,切换线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
getUser(userId)
.doOnNext(new Action1<User>() {
@Override
public void call(User user) {
processUser(user);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable error) {
// Error handling
...
}
});

接口嵌套调用:

1
2
3
4
5
@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
getToken()
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> onNext(String token) {
return getUser(token, userId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable error) {
// Error handling
...
}
});
2. RxBinding
1
2
3
RxView.clickEvents(button)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe(clickAction);
3. 各种异步操作

前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。

4. RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。

参考文章:
https://segmentfault.com/a/1190000011358934