拥抱RxJava(番外篇):关于RxJava的Tips & Tricks

前言:

起初写 拥抱RxJava 系列文章。只是因为看到很多人在使用RxJava时候,并没有很正确的理解Reactive Programming。仅仅在项目中使用了Retrofit的Rx Adapter或者使用了一点点RxBus就写道自己的项目中用了RxJava,并以此传道。我觉得这样是不好的。所以写了这一系列更好的介绍RxJava。 但是可能个人的语言能力确实略差,每次都会让读者有些许误解。今天这篇我们就放松一下,分享一下我使用RxJava时的一些Tips & Tricks (本文在为指定背景的情况下,使用的是 RxJava 2.x 版本。 )

这篇文章很多借鉴了这个presentation中的一些技巧: Common RxJava Mistakes (视频需要科学上网)

0 RxJava 不是网络请求库,不是线程库,RxJava是响应式编程在JVM上的一种实现方式!

很多人使用RxJava仅仅是因为线程切换方便,或者是因为Retrofit提供了这样一个炫酷的返回方式,或者仅仅是因为RxJava这种链式调用很炫酷,又或是因为大家都用RxJava我不用怕跟不上节奏。
如果你使用RxJava仅仅是因为如上几个原因,我建议你放弃RxJava。因为RxJava给你带来的坑将远多于你从RxJava中获得的便利。曾经有一个老爷爷说过一句话:

With Great Power Comes Great Responsibility
力量越大,责任越大

RxJava能做的远不仅仅是切换线程或者简单的变换(map()filter()等等)。所以当然我这篇不可能覆盖所有的 RxJava 的坑。如果你想使用RxJava,回头问一下自己使用这个库的初衷。

1 observeOn vs subscribeOn

这两个操作符可能对很多人是最常用的两个了。 然而这中间也有很多大坑在这里。

1.1 subscribeOn 控制上游,observeOn控制下游

很多人误以为 subscribeOn 控制的是Observable的生成的线程而observeOn控制的是 subscribe() 方法发生的线程。 当然,这点你们可以怪扔物线大神,毕竟他在他对RxJava最著名的帖子中写道:

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。 ——扔物线

这个观点不能说是错的(这点我稍后再将),但是更多的是 subscribeOn控制整个上游,而observeOn控制整个下游。 举个例子:

1
2
3
4
5
6
7
Observable.just("1","2","3")
.map(x -> x.length())
.subscribeOn(Schedulers.io())
.flatMap(x -> Observable.just(x,"pause"))
.observeOn(Schedulers.computation())
.map(x -> someHeavyCaculation(x))
.subscribe(x -> Log.d(TAG, x));

这段代码中各个操作符是在哪些线程中进行的?
我们看下答案:

1
2
3
4
5
6
7
Observable.just("1","2","3") //IO 线程
.map(x -> x.length()) //IO 线程
.subscribeOn(Schedulers.io())
.flatMap(x -> Observable.just(x,"pause")) //IO 线程
.observeOn(Schedulers.computation())
.map(x -> someHeavyCaculation(x)) //computation 线程
.subscribe(x -> Log.d(TAG, x)); //computation 线程

所以我们看到了,observeOn 后面的所有操作都会在这个线程工作。subscribeOn 会从这个Observable生成一直到遇到其他 observeOn。所以 observeOnsubscribeOn 的位置非常关键。

当然,这点问题 扔物线大神在文章中也详细的讲到了:

因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 ——扔物线

只不过很多人看帖子看一半,扔物线大神把这最重要的部分放到了lift()后面讲,很多人看完枯燥的lift()后,选择性忽视了最后关于 Scheduler非常关键的这部分。

1.2 subscribeOn只发生一次,observeOn可以使用多次

如果程序需要多次切换线程,使用多次observeOn是完全可以的。 而subscribeOn只有最上方的subscribeOn会起作用。这点扔物线大神的文章也补充过,大家可以回头再重温一下。

1.3 不是所有操作符都会在默认线程执行

很多操作符的默认执行线程并不是当前线程,这类操作符有一个特征就是会提供带有 Scheduler 参数的重载方法,比如 intervalinterval 会默认在computation线程执行,如果你在后面加上subscribeOn。 他还是会在computation线程执行,你只有在重载方法里加入其他 Scheduler,他才会在其他线程执行。如果你仔细看过 RxJava 的 JavaDoc。 他都会明确写出这个操作符的默认工作线程。

2 如果可能,避免使用Subject 当然包括RxBus!!

subject 作为Observable的结合体,在使用时非常方便。但是在使用时,很多时候并不尽人意。

2.1 Subject 的行为是不可预期的

Subject 由于暴露 onNext 方法。非常难控制。任何有这个subject引用的对象都可以使用这个方法传输数据,任何订阅了subject的人都可以接收到这个数据。
这导致你订阅Subject后几乎不清楚数据来源到底是谁。甚至也不知道你收到的到底是什么数据。这也是为什么我强烈抵制 RxBus 的一大原因。
Subject 由于自己是Observable, 他遵循Observable Contract。 如果其中某个事件出现异常,onError触发,那么这个Subject将再也不能使用。当然这点可以使用 Jake Wharton的 RxRelay来解决。 RxRelay就是一个没有onComplete和onError的Subject。
所以如果你的程序中必须使用Subject, 推荐将其设为 private field并且对外只暴露他的Observable形式。

2.2 Subject 默认是Hot Observable

关于 hot/cold Observable我在这篇文章中详细的解释过: 拥抱RxJava(三):关于Observable的冷热,常见的封装方式以及误区

Subject默认是热的,也就是说你发送的信息接收者是否接受的到是不一定的。是需要根据情况分析的。具体可以看我关于hot/cold Observable 的文章。

2.3 Again。 不要在继续使用RxBus了

RxBus 几乎都是基于Subject的再次封装。使得他不仅拥有了Subject是所有缺点还加入了很多缺点,比如他不是类型安全的。 我见过太多的RxBus封装都只是一个接受Object类型的Subject。这个问题当然也有很多RxBus通过 键值对或ofType()等等操作解决。再比如RxBus更容易造成内存泄漏(因为需要将所有事件和订阅者存储在Subject中,)。更多欢迎再次看一下我的第一篇关于RxJava的文章: 放弃RxBus,拥抱RxJava(一)

前几天我在Reddit上看到一个人的回复:

I think EventBus on android is popular because people don’t know how to share a java object reference between android components like a Fragment and an Activity, or 2 Activities and so on. So basically I think people don’t know how 2 Activites can observe the same object for data changes which I think comes from the fact that we still don’t know how to architect our apps properly.

我认为 EventBus在Android上火爆的原因是人们不知道怎么去在Android组件,例如Activity/Fragment之间共享一个Java对象的引用。

想一想,自己使用EventBus是不是也是这个原因呢?

3 如果你还在使用RxJava 1.x 建议尽快升级2.x版本

RxJava 2.x 更新了很多新内容。比如将Backpressure机制分离出来做成Flowable等等。 而且RxJava 1.x马上要寿终正寝,进入不再更新的模式(2017年6月)。所以还在使用RxJava 1.X 的同学们尽快更新吧。
如果你仍然处于某种原因,必须使用RxJava 1.x, 那么也千万不要使用Observable.create(Observable.OnSubscribe<T> f) 操作符(现已经被Deprecated)创建Observable。使用其他工厂方法或者直接升级为RxJava 2.x (现已经更新到2.1.0)才是正确的选择。

4 关于操作符

4.1 尽量避免过多的使用操作符,能合并的操作符尽量合并。

这里的合并不是指使用ObservableTransformer合并。而是指在逻辑上合并,比如:

1
2
3
4
Observable.just("Hello","World","RxJava")
.map(x -> x.length())
.map(x -> x + 2)
.subscribe(/**********/)

这里的两个map明显可以写成一个。 我们知道,每个操作符都会根据操作符的特性生成新的Observable,订阅他的上游然后给下游发送数据,避免使用过多的操作符可以降低内存抖动。
所以我不是很推荐使用ObservableTransformer来合并出来一个

1
2
3
ObservableTransformer applyThread = upstream ->
upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

这样虽然在写法上更简单了。但是损失了observeOn的灵活性还额外增加OverHead。得不偿失。当然,如果我们使用Transformer来进行模块解耦,这当然是非常值得的。详细可以参考我的上一篇文章:
动手做一个Full Rx App

4.2 flatMap并不保证发射顺序。

flatMap是将他每个得到的item转换成一个Observable,然后通过merge融合这些Observable。但是每个对应的Observable发射出去的一个或多个项目并不是完全有序的。如果想要保证发射顺序,使用concatMap。同理,merge操作符也不保证顺序,如果需要有序,使用concat

4.3 如果不是必要,不要在flatMap中使用过多的操作符。

我们刚才说了,每个item都会生成一个新的Observable,每个操作符也会。所以如果你的flatMap中有其他操作符,比如下面的代码:

1
2
3
4
Observable.fromIterable(list)
.flatMap(x -> Observable.just("x",x,"y")
.map(item -> "item" + item))
.subscribe();

如果你的list中有上万个item。 那么你将会调用这个map上万次,多生成上万个ObservableMap来进行这个操作。 我们可以简单的消除这个OverHead。 将map拿出来,如下:

1
2
3
4
Observable.fromIterable(list)
.flatMap(x -> Observable.just("x",x,"y"))
.map(item -> "item" + item)
.subscribe();

这样flatMap后的Observable会统一进行管理。省去了那上万个ObservableMap。

4.4 如果不是必要,不要自己写 Observable 的操作符。

Observable的每个操作符都有着很复杂的逻辑,就连很多RxJava的专家都会出错。如果你真的想写自己的操作符,我建议你首先阅读这个文章:
Writing operators for 2.0
详细的介绍了如何写操作符,要遵顼哪些规则。 顺便一提,这个文章有将近1700行,还有三个主要模块处于TBD阶段,并没有完全补充。写操作符的难度可想而知。

小彩蛋:RxJava和Kotlin我最近碰到的一个坑

最近使用RxJava和Kotlin。 我将

1
startWith(idleState());

写成了

1
startWith{idleState()};

在我的IDE几乎肉眼难以分辨的区别。由于kotlin的lambda规则,{} 把我需要的变量解析成了lambda表达式,又由于正好是单参数的lambda,可以省略参数转换为 it 的写法。导致我这句仍然可以编译,但重载了错误的操作符。导致整个链条崩溃。那些年和我说kotlin语法更为安全的人你过来我给你加个BUFF!

拥抱RxJava(四):动手做一个Full Rx的 注册界面

背景/Background

前阵子不久,Jake Wharton 在Devoxx 的演讲: The State of Managing State with RxJava 中提出了一个类似于Redux的 Full Rx 的App 结构。 如下图:

Redux

整个结构全部由RxJava控制 state。 和传统MVX结构类似,也是大致分UI层(View),中间层(Presenter/ViewModel/Controller或者我更喜欢叫Translator)和数据层(Model)。大致流程如下:

  1. Ui层(View)层将用户输入数据打包成UiEvent传递给中间层
  2. 中间层(Translator)将Event处理成对应的Action交给数据处理层。
  3. 处理结果打包成对应的Result交还给Translator
  4. Translator将数据结果打包成对应的UiModel交换给Ui做对应的Ui显示。

实现/Demo

我们先一步一步写个Demo看下这个结构的优缺点吧!
为了方便,我直接使用Android Studio提供的LoginActivity模板。
我们的目的是要做一个注册界面,为了简化只有用户名,密码。首先我们来定义Event:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class AuthEvent {
public final static class SignUpEvent extends AuthEvent {
private final String username;
private final String password;
public SignUpEvent(String username, String password) {
this.username = username;
this.password = password;
}
//... getters
}
}

这里SignUpEvent继承自AuthEvent是为了统一逻辑。这样我们可以在一整条stream里实现我们所有的逻辑。
我们在Ui层将这个Event打包(这里我使用RxBinding):

1
2
3
Observable<SignUpEvent> click = RxView.clicks(mEmailSignInButton)
.map(ignore -> new SignUpEvent(mEmailView.getText().toString(),
mPasswordView.getText().toString()));

这样我们每次点击按钮就会发射一个SignUpEvent出来。

再来我们定义我们的UiModel,我们首先要想好,我们的Ui到底有几种状态,我们将各种状态提前定义。我大致觉得我们需要四种状态:

  1. idle 初始状态,就是用户第一次进入的状态
  2. inProcess 状态,也就Ui界面等待注册是否成功的状态
  3. success 状态,注册成功 进行下一步操作
  4. fail 状态,注册失败,返回失败信息。

根据这四种状态,我们来定义UiModel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AuthUiModel {
private final boolean inProcess;
private final boolean usrValidate;
private final boolean pwdValidate;
private final boolean success;
private final String errorMessage;
private AuthUiModel(boolean inProcess, boolean usrValidate, boolean pwdValidate, boolean success, String errorMessage) {
this.inProcess = inProcess;
this.usrValidate = usrValidate;
this.pwdValidate = pwdValidate;
this.success = success;
this.errorMessage = errorMessage;
}
public static AuthUiModel idle() {
return new AuthUiModel(false, true, true, false, "");
}
public static AuthUiModel inProcess() {
return new AuthUiModel(true, true, true, false, "");
}
public static AuthUiModel success() {
return new AuthUiModel(false, true, true, true, "");
}
public static AuthUiModel fail(boolean username, boolean password, String msg) {
return new AuthUiModel(false, username, password, false, msg);
}
//... getters
}

再来是Model层,我们这里用一个简单的AuthManager来管理,解耦出来后这里可以替换成任意你喜欢的注册方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AuthManager {
private SignUpResult result;
private Observable<SignUpResult> observable = Observable.fromCallable(() -> result)
//延迟2s发送结果,模拟网络请求延迟
.delay(2000, TimeUnit.MILLISECONDS);
public Observable<AuthResult.SignUpResult> signUp(SignUpAction action) {
//检查用户名是否合法
if (TextUtils.isEmpty(action.getUsername()) || !action.getUsername().contains("@")) {
result = SignUpResult.FAIL_USERNAME;
}
//检查密码合法
else if (TextUtils.isEmpty(action.getPassword()) || action.getPassword().length() < 9) {
result = SignUpResult.FAIL_PASSWORD;
} else {
//检查结束,返回注册成功的信息
// TODO: createUser
result = SignUpResult.SUCCESS;
}
return observable;
}
}

这里SignUpAction里定义了我们注册所有需要的信息,代码和SignUpEvent几乎雷同。但是分离的好处是可以对数据进行在处理或者合并打包等等。

Ui和Model都准备好了,我们开始我们的Translator部分。 Translator部分主要又ObservableTransformer组成。 将各个部件组装,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final ObservableTransformer<SignUpEvent, AuthUiModel> signUp
//上游是UiEvent,封装成对应的Action
= observable -> observable.map(event -> new SignUpAction(event.getUsername(), event.getPassword()))
//使用FlatMap转向,进行注册
.flatMap(action -> authManager.signUp(action)
//扫描结果
.map(signUpResult -> {
if (signUpResult == SignUpResult.FAIL_USERNAME) {
return AuthUiModel.fail(false, true, "Username error");
}
if (signUpResult == SignUpResult.FAIL_PASSWORD) {
return AuthUiModel.fail(true, false, "Password error");
}
if (signUpResult == SignUpResult.SUCCESS) {
return AuthUiModel.success();
}
//TODO Handle error
throw new IllegalArgumentException("Unknown Result");
})
//设置初始状态为loading。
.startWith(AuthUiModel.inProcess())
//设置错误状态为error,防止触发onError() 造成断流
.onErrorReturn(error -> AuthUiModel.fail(true, true, error.getMessage())));

这样我们在Activity里 将各个部分通过Translator组装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
disposables.add(click.compose(translator.signUp)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(authUiModel -> {
//载入进度条
mProgressView.setVisibility(authUiModel.isInProcess() ? View.VISIBLE : View.GONE);
//判断用户名/密码是否合法
if (!authUiModel.isPwdValidate()) {
mPasswordView.setError(authUiModel.getErrorMessage());
} else {
mPasswordView.setError(null);
}
if (!authUiModel.isUsrValidate()) {
mEmailView.setError(authUiModel.getErrorMessage());
} else {
mEmailView.setError(null);
}
//是否成功
if (authUiModel.isSuccess()) {
Toast.makeText(this, "CreateUser SuccessFull", Toast.LENGTH_SHORT)
.show();
}
}));

很明显的看到,在Activity中 只有Ui相关的处理,而中间的逻辑通过translator解耦出来,对Activity不可见。

问题/Issues

但是,问题来了。这里些许Bug.由于我们使用Transformer. 每次转屏的时候会通过RxView来生成新的Observable.这样我们的translator并没有复用,还是绑定在了生命周期上。那么如何解决?

我们设想一下,如果中间的Translator可以随时接受下游的订阅而且无论下游是否有订阅,他都可以一直运行,这样不就在下游彻底解耦了吗?这种特性的Observable我在上一篇文章中说到是ConnectableObservable。这里我们使用Replay(1)。这样我们就每次重新订阅,也会获得最近的一次UiModel,再也不用担心转屏/内存重启。

下游解决了,那上游呢?如果上游每次调用这个Transformer,每次还是一个新的Observable啊。理想的情况应该是我们有一个中间人,他不断接受Ui层传过来的UiEvent然后交给我们Transformer, 这样我们就能一直复用我们的Transformer。也就是他既作为一个Observer订阅上游UiEvent又作为一个Observable,给下游传递数据。那么答案呼之欲出,我们需要一个Subject作为中间人。
改善后的Translator代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class AuthTranslator {
private AuthManager authManager;
private Subject<SignUpEvent> middle = PublishSubject.create();
private Observable<AuthUiModel> authUiModelObservable
= middle.map(event -> new SignUpAction(event.getUsername(), event.getPassword()))
//使用FlatMap转向,进行注册
.flatMap(action -> authManager.signUp(action)
//扫描结果
.map(signUpResult -> {
if (signUpResult == SignUpResult.FAIL_USERNAME) {
return AuthUiModel.fail(false, true, "Username error");
}
if (signUpResult == SignUpResult.FAIL_PASSWORD) {
return AuthUiModel.fail(true, false, "Password error");
}
if (signUpResult == SignUpResult.SUCCESS) {
return AuthUiModel.success();
}
//TODO Handle error
throw new IllegalArgumentException("Unknown Result");
})
//设置初始状态为loading。
.startWith(AuthUiModel.inProcess())
//设置错误状态为error,防止触发onError() 造成断流
.onErrorReturn(error -> AuthUiModel.fail(true, true, error.getMessage())))
.replay(1)
.autoConnect();
public final ObservableTransformer<SignUpEvent, AuthUiModel> signUp
//上游是UiEvent,封装成对应的Action
= observable -> {
//中间人切换监听
observable.subscribe(middle);
return authUiModelObservable;
};
public AuthTranslator(AuthManager authManager) {
this.authManager = authManager;
}
}

这样我们刚才说的两个Bug就解决了。而且即使我们在请求中转屏,也毫无问题。

总结

实践一下这个结构确实有很多优点。

  1. 将一整条state stream解耦分成几块,但又保持了一整条的结构。
  2. 相比传统MVX模式,多次控制翻转(Ioc),解耦更彻底
  3. 由于RxJava强大的操作符群。可以实现很多意想不到的功能

缺点也蛮明显:

  1. 我个人对这个架构理解也不是特别深入,中间的middle部分虽然用Subject 但是确实有其不稳定性,比如onError/onComplete会停止这个Subject造成断流
  2. 由于解耦彻底,造成需要很多辅助类,茫茫多的boilerplate。 不过这个在kotlin上有很好的发挥,sealed class,when 等语法几乎是为其量身定做。
  3. 难,真的难。比传统MVP,甚至MVVM需要更清晰,更合理的设计。不提前想好use case就开始写几乎是不可能的。而且RxJava如果不熟悉,调试起来确实很难。经常不能定位到代码。最好做单元测试各个模块。

最后附上这个Demo 的GitHub Repo: RxAuthDemo

拥抱RxJava(三):关于Observable的冷热,常见的封装方式以及误区

前两篇文章 放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus放弃RxBus,拥抱RxJava(二):Observable究竟如何封装数据? 写了一堆理论。看起来并没有什么实际用处,那么今天。我们实战一下,来封装我们需要的数据,并且了解一下各种方式具体的区别。

前言: 很多朋友误会我文章的意思。我写这个系列文章的意思主要是帮助了解一下RxJava的常见用法。而不是使用一下自己或别人封装好的RxBus就觉得自己的项目使用RxJava了。但是这也仅仅是个人口味问题,很多情况下确实RxBus/EventBus会很方便,很刺激,很上瘾。所以从这篇文章开始,我把标题中的”放弃RxBus”去除。

无论在简书,微信平台,GitHub等等分享平台。一个名字上写着 “MVP(MVVM) + RxJava + Retrofit + Dagger2 + ……..”这样的名字,再熟悉不过了。然而,大多数情况进去看一下RxJava部分。要么就是简单的把取到的数据用Observable.just()直接传给下一层,要么就是直接使用Retrofit的Adapter来直接获得Observable,而app中其他部分并没有reactive。而且还有很多Observable用法错误,比如冷热不分,连续太多的Map/FlatMap等等。

0. RxBus/Retrofit 足够用了,我为什么要让自己的App 更加的Reactive?

为什么不用RxBus我已经写了两篇文章了,可能由于我不常写文,很多人并没有理解。在这里我再解释一次:EventBus如果是一辆穿梭在所有代码之间的公交车。那么Observable就是穿梭在少许人之间的Uber专车。他比起EventBus有很多优势,比如异常处理,线程切换,强大的操作符等等。你当然可以做出一辆超级Uber来当全局公交车(RxBus)使用,然而这却损失了RxJava本来的许多优势,并且又给自己挖了许多坑。

0.1 一个常见误区,过多的operator

刚开始使用RxJava的时候,我们会觉得operator的链式调用会非常的爽,一个简单的例子:

1
2
3
4
5
6
7
8
Observable.just("1", "2", "3", "4", "5", "6", "7")
.map(x -> Integer.valueOf(x))
.map(x -> x * 2)
.map(x -> x + 4)
.filter(x -> x >2)
// and much more operators
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

当你只有很少数据的时候,这样当然可以,但是你数据量上来的时候,这就会有很多的overhead。 其实几乎所有的operator都会给你生成一个新的Observable。所以在上面这个例子中,我们在过程中生成了至少7个Observable。然而我们完全可以将中间的.map().map().map().filter合并在一个FlatMap中,减少很多的overhead。

1. Observable.just()的局限性。

  1. 使用Observable.just() 即使你没有调用subscribe方法。just()括号里面的代码也已经执行了。显然,Observable.just()不适合封装网络数据,因为我们通常不想在subscribe之前做网络请求。
    举个例子:
1
2
3
4
5
6
class TestClass{
TestClass(){
System.out.println("I'm created!");
}
}
Observable.just(new TestClass());

这时你运行代码,你就看到确实你的TestClass 已经被创建了:

1
I/System.out: I'm created!

当然,这个可以简单的用defer()/fromCallable()/create()操作符来是实现只有subscribe只有才加载。
比如:

1
2
3
4
// use fromCallable
Observable.fromCallable(TestClass::new);
//or
Observable.defer(() -> Observable.just(new TestClass()));
  1. Observable.just()不够灵活。虽然说设计模式上我们追求 “Minimize Mutability” 但是如果我们的程序越来越 reactive的时候。一个 ObservableJust 往往是不满足需求的。比如之前一定订阅的subscriber。如果数据更新了,你不可以同过ObservableJust 来通知所有的Observable 新数据更新了,需要你的subscriber主动更新。这显然有悖于我们追求的reactive programming。 主动pull数据而不是数据告诉你,我更新了然后再做出反应。

当然ObservableJust在很多情况下,确实不错。如果你不需要监听后续的更新,那么ObservableJust可以满足你的需求。

2. Hot Observable 和 cold Observable

很多人在封装数据的时候,并没有太多考虑冷热的问题,通常情况下并不会出错。因为目前很多开源项目(Demo)里除了RxBus,并没有太多的RxJava的实时情况。然而,当你的App越来越Reactive的时候,冷热便是一个必须考虑的问题。
Hot Observable 意思是如果他开始传输数据,你不主动喊停(dispose()/cancel()),那么他就不会停,一直发射数据,即使他已经没有Subscriber了。而Cold Observable则是subscribe时才会发射数据。
然而,问题来了。我上篇文章讲过,只有subscribeActual方法调用了的时候,Observable发射数据,那为什么Hot Observable没有Subscriber也会发射数据,他把数据发射给谁了呢?我们在解决这个问题之前,先看一下Cold Observable:

2.1 Cold Observable

我们常见的工厂方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer()。 他们的共同点是当你有多个Subscriber的时候,他们的事件是独立的,举个例子:

1
Observable interval = Observable.interval(1,TimeUnit.SECONDS);

如果我们有两个subscriber,那么他们会各自有自己的计时器,并且互不干扰。效果如下图:

2.2 Hot Observable

不同于Cold Observable, Hot Observable是共享数据的。对于Hot Observable的所有subscriber,他们会在同一时刻收到相同的数据。我们通常使用publish()操作符来将ColdObservable变为Hot。或者我们在RxBus中常常用到的Subjects 也是Hot Observable。
刚刚我们刚刚提出了一个问题,

既然Hot Observable在没有subscriber的时候,还会继续发送数据,那么数据究竟发给谁了呢?

其实Hot Observable其实并没有发送数据,而是他上层的Observable 发送数据给这个hot Observable。不信?我们来分别看一下:

2.2.1 ConnectableObservable

我们在上面的误区中知道了,几乎所有operator都会生成一个新的Observable。publish当然不例外。但是有区别的是,publish会给你一个ConnectableObservable。具体实现类是ObservablePublish。这个Observable的区别是他提供一个connect()方法,如果你调用connect()方法,ConnectableObservable就会开始接收上游Observable的数据。我们来测试一下:

1
2
3
ConnectableObservable interval = Observable.interval(1, TimeUnit.SECONDS).publish();
//connect even when no subscribers
interval.connect();

果然,由于我们subscribe晚了一些。0这个数据没有收到,当我们两个 Subscriber 都dispose的时候,ConnectableObservable 也仍在接受数据,导致我们6这个数据没有接收到。
ConnectableObservable 其实在内部,有一个PublishObserver,他有两个作用。一个是当我们调用 connect()方法时, PublishObserver开始接受上游的数据,我们的例子里便是 Observable.interval(1, TimeUnit.SECONDS) 。所以才能在我们没有调用 subscribe方法时,他也能开始发送数据。第二个作用是 PublishObserver存储所有的下游Subscriber, 也就是我们例子中的Subscriber1 和Subscriber2,在 PublishObserver 每次接到一个上游数据,就会将接收到的结果,依次分发给他存储的所有 Subscribers ,如果下游 Subscriber 调用了 dispose方法,那么他就会在自己的缓存中删除这个 Subscriber,下次接受到上游数据便不会传给这个Subscriber
那么这时候,有同学应该要问了:

我们可不可以停止从上游接受数据?

我们当然可以。但是从设计的角度,RxJava为了提供链式调用。 connect()方法会返回一个 Disposable 给我们来控制是否继续接受上游的数据。

2.2.2 ConnectableObservable的常用操作符

我们当然不希望每次都手动控制 ConnectableObservable的开关。RxJava给我们提供了一些常用的控制操作符

  1. refCount()
    refCount()可以说是最常用的操作符了。他会把 ConnectableObservable变为一个通常的Observable但又保持了HotObservable的特性。也就是说,如果出现第一个Subscriber,他就会自动调用 connect()方法,如果他开始接受之后,下游的 Subscribers全部dispose,那么他也会停止接受上游的数据。具体看图:

    每个 Subscriber 每次都会接受同样的数据,但是当所有 subscriber 都 dispose时候,他也会自动dipose上游的 Observable 。所以我们重新subscribe的时候,又重新从0开始。
    这个操作符常用到,RxJava将他和publish合并为一个操作符 :share()

  2. autoConnect()
    autoConnect()看名字就知道,他会自动链接,如果你单纯调用 autoConnect() ,那么,他会在你链接第一个 Subscriber 的时候调用 connect(),或者你调用 autoConnect(int Num),那么他将会再收到Num个 subscriber的时候链接。
    但是,这个操作符的关键在于,由于我们为了链式调用,autoConnect会返回Observable给你,你不会在返回方法里获得一个 Disposable来控制上游的开关。 不过没问题,autoConnect提供了另一种重载方法 :
    autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)
    他会在这个 Consumer传给你 你需要的那个总开关。而且,autoConnect并不会autoDisconnect, 也就是如果他即使没有subscriber了。他也会继续接受数据。

  3. replay()
    replay()方法和 publish()一样,会返回一个 ConnectableObservable,区别是, replay()会为新的subscriber重放他之前所收到的上游数据,我们再来举个例子:
1
2
//only replay 3 values
Observable.interval(1, TimeUnit.SECONDS).replay(3).refCount();

果然,Subscriber2在subscribe时候,立即收到了之前已经错过的三个数据,然后继续接受后面的数据。
但是,这里有几点需要考虑:replay() 会缓存上游发过来的数据,所以并不需要担心重新生成新数据给新的 Subscriber。

  1. ReplayingShare()
    其实ReplayingShare并不能算是ConnectableObservable的一个操作符,他是JakeWhaton的一个开源库,只有百来行。实现的功能是几乎和replay(1).refCount()差不多。但是如果中断 Conncection之后,重新开始subscribe,他仍然会给你一个重放他上一次的结果。 具体看图:

    我们看到和刚才的replay不同,即使两个Subscriber都 dispose, 重新开始仍然会接收到我们缓存过的一个数据。
2.3 Subjects

Subjects 作为一个Reactive世界中的特殊存在,他特殊在于他自己既是一个Observable又是一个Observer(Subscriber)。你既可以像普通Observable一样让别的Subscriber来订阅,也可以用Subjects来订阅别人。更方便的是他甚至暴露了OnXX(),方法给你。你直接调用可以通知所有的Subscriber。 这也是RxBus的基础,RxBus几乎离不开Subjects。 蜘蛛侠的老爹告诉我们,力量越大,责任就也大。Subjects也一样。 Subjects因为暴露了OnXX()方法,使得Subjects的数据来源变得难以控制。而且,Subjects一直是HotObservable,我们来看下Subject的OnNext()方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void onNext(T t) {
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (PublishDisposable<T> s : subscribers.get()) {
s.onNext(t);
}
}

可以看出来Subjects只要调用了OnNext()方法就会立即发送数据。所以,使用时一定要注意Subjects和Subscriber的链接时序问题。具体Subjects的用法我想介绍帖子已经足够多了。这里就不赘述了。

3. 在Android中常见的几种封装和注意事项

1.封装View 的Listener

View 的各种Listener 我们常用create方法来封装,比如OnClickListener:

1
2
3
4
Observable.create(emitter -> {
button.setOnClickListener(v -> emitter.onNext("I'm Clicked"));
emitter.setCancellable(() -> button.setOnClickListener(null));
});

这里非常关键的一点是一定要设置解除绑定,否则你将持续使用这个会造成内存泄漏。而且最好配合使用share()。否则只有最后一个Subscriber能收到OnClick。当然,如果不考虑方法数的话,推荐配合使用RxBinding。

而且,用create()方法封装Listener适合几乎所有的callback, 并且安全。

2.封装简单的数据源

设想一个场景,我们有一个User类。里面有我们的用户名,头像,各种信息。然而在我们的app中,可能有三四个Fragment/Activity需要根据这个User做出不同的反应。这时我们就可以简单的使用Subject来封装User类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class UserRepository {
private User actualUser;
private Subject<User> subject = ReplaySubject.createWithSize(1);
/**
*
*Get User Data from wherever you want Network/Database etc
*/
public Observable<User> getUpdate(){
return subject;
}
public void updateUser(User user){
actualUser = user;
subject.onNext(actualUser);
}
}

如果我们某些模块需要这个User,那么只需要subscribe到这个Repository,如果User有更新,每一个Subscriber都会收到更新后的User并且互相不影响。而且我们使用ReplaySubject,即使有新的Subscriber,也会收到最新的一个Subject。
但是使用的时候一定要注意,因为用的是Subject.所以在onNext方法中一旦出现了error。那么所有的Subscriber都将和这个subject断开了链接。这里也可以用RxRelay代替Subject,简单来说Relay就是一个没有onError和onComplete的Subject。

3.简单的使用concat().first()来处理多来源

Dan Lew在他的博客Loading data from multiple sources with RxJava 中介绍过他这种处理方法,

1
2
3
4
5
6
7
8
9
// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;
// Retrieve the first source with data
Observable<Data> source = Observable
.concat(memory, disk, network)
.first();

然后在每次做不同请求的时候刷新缓存

1
2
3
4
5
6
7
8
Observable<Data> networkWithSave = network.doOnNext(data -> {
saveToDisk(data);
cacheInMemory(data);
});
Observable<Data> diskWithCache = disk.doOnNext(data -> {
cacheInMemory(data);
});

具体也可以看这篇简书,我也不在过多赘述 :RxJava(八)concat符操作处理多数据源

4.自己继承Observable 手动写subscribeActual()方法

这可能是最灵活的写法?如果你想用RxJava封装自己的库,推荐这种方法封装。因为这样不仅仅可以有效的进行错误处理,并且不会暴露过多逻辑给外面,许多优秀的RxJava相关库都是这样封装,就连RxJava自己也是把一个个的operator封装成一个个不同的Observable。但是这种方法确实要求很高,要做很多考虑,比如异步,多线程冲突,错误处理。对新手不是很推荐。

放弃RxBus,拥抱RxJava(二):Observable究竟如何封装数据?

上篇简单讲到了一些关于Event/Rx bus的优缺点。并且提到了如何“正确”使用RxJava,而不是使用RxBus来自己重新发明轮子。

放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus

其中也讲到了一个简单使用 create() 方法来进行封装Observable。但也留下了许多坑,比如内存泄漏,不能Multicast(多个Subscriber订阅同一个Observable) 等问题。所以这篇,我们接着通过这个例子,来具体了解下,如何封装Observable。

1. Observable提供的静态方法都做了什么?

首先我们来简单看一下Observable的静态方法,just/from/create都怎么为你提供Observable。
我们先看just:

1
2
3
4
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

我们暂时不需要纠结 RxJavaPlugins.onAssembly() 这个方法。比较重要的是 just(T item) 方法会为你提供一个 ObservableJust(item) 的实例,而这个 ObservableJust 类,就是一个RxJava内部的实现类。
在 RxJava 2.x 中 Observable 是一个抽象类,只有一个抽象方法,subscribeActual(Observer observer);(但是Observable的源码足足有13518行!!!)

1
2
3
4
5
6
7
public abstract class Observable<T> implements ObservableSource<T>{
//implemented methods
protected abstract void subscribeActual(Observer<? super T> observer);
//other implements/operators
}

那么ObservableJust这个类究竟什么样呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}

我们首先看到构造方法里,直接把value赋给了ObservableJust的成员。这也就是为什么Observable.just()里的代码会直接运行,而不是像create()方法,有Subscriber时候才能运行。
再来看看两个item的just(T item1,T item2):

1
2
3
4
5
6
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
return fromArray(item1, item2);
}

诶?怎么画风突变?不是ObservableJust了?其实除了只有一个item的just,其他的just方法也都是调用了这个fromArray。那我们来看看这个fromArray:

1
2
3
4
5
6
7
8
9
10
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

前面一些check我们忽略,这里我们发现一些熟悉的身影了ObservableFromArray(items)。又一个Observable的实现类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
//implements
}
}

是不是更熟悉?其实Observable几乎所有的静态方法都是这样,甚至包括一些著名的RxJava库比如RxBinding,也都是使用这种封装方法。内部实现Observable的subscribeActual()方法。对外只提供静态方法来为你生成Observable。为什么这么做,我们来了解一下subscribeActual()方法。

2. subscribeActual() 究竟是什么?

subscribeActual()其实就是Observable和Observer沟通的桥梁。这个Observer(Subscriber)就是你在Observable.subscribe()方法里写的那个类,或者是Consumer(只处理onNext方法)。

1
2
3
4
5
6
7
8
9
10
11
12
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
} catch (Throwable e) {
}
}

我们看到其实这个方法除了Check和Apply就只有这一行subscribeActual(observer),连接了Observable和Observer。所以我们知道了,subscribeActual()方法里的代码,只有在subscribe()调用后,才回调用。

那么他们是如何链接的呢?其实很简单,根据你的逻辑一句一句的调用observer.onXX()方法就可以了。比如刚才我们看到的ObservableJust:

1
2
3
4
5
6
7
8
9
10
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}

再比如我们的ObservableFromArray:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}

复杂点的例子,比如如何封装button的OnClick事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}
static final class Listener extends MainThreadDisposable implements OnClickListener {
private final View view;
private final Observer<? super Object> observer;
Listener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
@Override public void onClick(View v) {
if (!isDisposed()) {
observer.onNext(Notification.INSTANCE);
}
}
@Override protected void onDispose() {
view.setOnClickListener(null);
}
}
}

但是细心的同学应该看到了,每个subscribeActual()方法里,都会有 observer.onSubscribe(disposable)这句。那么这句又是做什么的呢?
Disposable其实就是控制你取消订阅的。他只有两个方法 dispose() 取消订阅,和 isDisposed() 来通知是否已经取消了订阅。
取消订阅时,要根据需求释放资源。在subscribeActual()里逻辑要严谨,比如onComplete()之后不要有onNext()。需要注意的点很多,所以可能这也就是为什么RxJava推荐用户使用静态方法生成Observable吧。

最后再说一下几点:

  • create()方法:其实create()方法相当于你把subscribeActual中的代码,写到了create里而已。所以有很高的操控性。
  • Flowable:Floawble其实在实现上和Observable类似,区别是Observable同过 Disposable控制取消订阅。而Flowable同过Subscription。其中还需要request()方法控制流量。具体关于这个问题,我推荐这篇文章

    给初学者的RxJava2.0教程

总结:

  • 我们从源码分析角度来说,RxJava 2.x 也是同过subscribeActual来链接Observable和Observer(Subscriber)。本质上和Listener没什么太大区别。但是,RxJava的确是诸多一线Java/Android开发者的结晶。丰富的操作符,线程调度等等诸多优势。而且保证类型安全。
    如果你还是停留在仅仅使用RxJava来实现一个RxBus,是不是有点杀鸡用牛刀呢?

放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus

EventBus和Otto在之前作为Android组件间通信工具,简单方便十分受欢迎,但是也非常容易Abuse。大概有如下几个缺点:

  • 由于是Event,在发布Event的时候就要做好准备可能并没有人接受这个Event, Subscribe的时候也要做好准备可能永远不会收到Event。Event无论顺序还是时间上都某种程度上不太可控。如果你将数据寄托在Event上然后就直接在Android其他生命周期方法中直接使用这个数据或成员变量。那么很有可能你会得到NPE。
  • EventBus看似将你的程序解耦,但是又有些过了。我们常常使用EventBus传数据,这已经是Dependency级别的数据而不是一个可以被解耦出来的模块。这样就造成了过多EventBus的代码会造成代码结构混乱,难以测试和追踪,违背了解耦的初衷。这时如果有意或无意的造成了Nested Event。那情况会更糟。

由于EventBus的种种缺点,以及后面RxJava的出现。很多人都开始使用RxJava来取代EventBus。甚至Otto的官方介绍里都写到:

Deprecated!

This project is deprecated in favor of RxJava and
RxAndroid. These projects permit the same event-driven
programming model as Otto, but they’re more capable and offer better control of threading.

If you’re looking for guidance on migrating from Otto to Rx, this post
is a good start.

链接是一个教你怎么使用RxJava来自己手动写一个RxBus来代替EventBus的文章。虽然看起来是在用RxJava。但是实际上却仍然在用EventBus。甚至这个封装其实也并没有GreenRobot或者Otto来的好。
我们看看Jake Wharton对RxBus的评价:

我想”RxBus”唯一的好处就是他是一个Rx的入门毒品。否则的话,你要么就不是在用Rx,要么你需要更加惯用的Rx资源 (渣翻译见谅)

再来一个GitHub的:

subscribeActual部分我们先不考虑。然而Jake指出最好不要使用Relay来“重新发明”Event Bus.

这里看图说话:
Jake Wharton在GOTO 2016 上的讲座中提到,我们正常的Android编程是这样的:

我们像一个中间人一样。
而使用RxJava。 我们的结构,更像这样

我们使用RxJava来直接把组件相连,对所接受到的数据作出反应,所谓的 “Reactive”。
而使用Eventbus? Jake 没说, 我自己画一个:


我们作为一个中间人,传递消息。EventBus作为另一个中间人。帮我们传递消息。(这也就是所谓的“看似解耦”)

再打个比方,虽然我们将EventBus翻译成时间总线,但是其实总线就是Bus也就是公交车。而RxJava更像一个专车,Uber或者滴滴。他直接链接你的两个或多个需要通信的类。传输数据,当然你可以做一个很大的专车,穿梭在所有类之间,也就是所谓的RxBus。所以在这里为什么放弃RxBus也就不言而喻了不是?

那么,问题来了?

怎样才是正确(正常?)的RxJava使用方式?

其实Jake 也在GitHub的讨论上给出了一个答案:

所以应该是,每当你想发布一个Event在EventBus时,直接暴露一个Observable出来。每当你想接受一个Event时,找到这个Observable并且Subscribe他。

这样做的好处是什么?

  • 目标和地点都很明确。你的Subscriber明确的知道他Subscribe的是谁,而且明确的知道我需要作出什么反应。这也正是RxJava的核心“响应式编程”。
  • 由于使用了Observable,对于异常处理将会非常方便。而且还有功能强大全面的Operator来辅助你。
  • 虽然看起来耦合性有所增加。但是这是必要的,上面也说过,EventBus虽然在代码上看似解耦。其实他们还是联系在一起的。而我们这样直接暴露Observable给需要的其他类,这完成了1 -> 1/N的链接,而不需要EventBus这个中间人来传递消息/事件,而且保证我们需要的事件一定会直接到达。

####我们来举个例子

上下两个Fragment,上面的一个EditText,下面的一个TextView。上面的EditText变化的时候下面的TextView也跟着变化。

先把EditText的TextChangedListener封装在Observable里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
stringObservable = Observable.create(e -> editText.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
e.onNext(s.toString());
}
@Override
public void afterTextChanged(Editable s) {
}
}));
/**
***
*/
//Expose Observable
public Observable<String> getEditTextObservable() {
return stringObservable;
}

不习惯自己封装可以使用RxBinding :

1
2
stringObservable = RxTextView.textChangeEvents(editText)
.map(event -> event.text().toString());

再从我们的TextViewFragment中 取到这个封装好的Observable:

1
2
3
4
5
6
7
8
@Override
public void onStart() {
super.onStart();
FragmentEditText fragment = (FragmentEditText) getFragmentManager().findFragmentByTag(FragmentEditText.TAG);
if(fragment != null){
fragment.getStringObservable().subscribe(s -> textView.setText(s));
}
}

来看看效果:

当然,这里还有个问题

  • 由于我们将editText封装在Observable里,无论是create()方法还是使用RxBinding,都会持有这个View的强引用。造成内存泄漏。所以我们一定要在最后加入dispose()方法来释放。所以我推荐使用RxBinding,他已经帮我们在dispose()方法里写好了解除Listener的方法。
  • 因为并没有使用publish操作符,导致多个Subscriber的时候还是有些许问题。可以考虑直接加入.share().

具体我们如何讲常用的数据/Callback封装到Observable中。我会在接下来的文章中写到。

第二篇链接