上篇简单讲到了一些关于Event/Rx bus的优缺点。并且提到了如何“正确”使用RxJava,而不是使用RxBus来自己重新发明轮子。
放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus
其中也讲到了一个简单使用 create() 方法来进行封装Observable。但也留下了许多坑,比如内存泄漏,不能Multicast(多个Subscriber订阅同一个Observable) 等问题。所以这篇,我们接着通过这个例子,来具体了解下,如何封装Observable。
1. Observable提供的静态方法都做了什么?
首先我们来简单看一下Observable的静态方法,just/from/create都怎么为你提供Observable。
我们先看just:
1 2 3 4
| public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }
|
我们暂时不需要纠结 RxJavaPlugins.onAssembly() 这个方法。比较重要的是 just(T item) 方法会为你提供一个 ObservableJust(item) 的实例,而这个 ObservableJust 类,就是一个RxJava内部的实现类。
在 RxJava 2.x 中 Observable 是一个抽象类,只有一个抽象方法,subscribeActual(Observer observer);(但是Observable的源码足足有13518行!!!)
1 2 3 4 5 6 7
| public abstract class Observable<T> implements ObservableSource<T>{ //implemented methods protected abstract void subscribeActual(Observer<? super T> observer); //other implements/operators }
|
那么ObservableJust这个类究竟什么样呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } }
|
我们首先看到构造方法里,直接把value赋给了ObservableJust的成员。这也就是为什么Observable.just()里的代码会直接运行,而不是像create()方法,有Subscriber时候才能运行。
再来看看两个item的just(T item1,T item2):
1 2 3 4 5 6
| public static <T> Observable<T> just(T item1, T item2) { ObjectHelper.requireNonNull(item1, "The first item is null"); ObjectHelper.requireNonNull(item2, "The second item is null"); return fromArray(item1, item2); }
|
诶?怎么画风突变?不是ObservableJust了?其实除了只有一个item的just,其他的just方法也都是调用了这个fromArray。那我们来看看这个fromArray:
1 2 3 4 5 6 7 8 9 10
| public static <T> Observable<T> fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }
|
前面一些check我们忽略,这里我们发现一些熟悉的身影了ObservableFromArray(items)。又一个Observable的实现类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { //implements } }
|
是不是更熟悉?其实Observable几乎所有的静态方法都是这样,甚至包括一些著名的RxJava库比如RxBinding,也都是使用这种封装方法。内部实现Observable的subscribeActual()方法。对外只提供静态方法来为你生成Observable。为什么这么做,我们来了解一下subscribeActual()方法。
2. subscribeActual() 究竟是什么?
subscribeActual()其实就是Observable和Observer沟通的桥梁。这个Observer(Subscriber)就是你在Observable.subscribe()方法里写的那个类,或者是Consumer(只处理onNext方法)。
1 2 3 4 5 6 7 8 9 10 11 12
| public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD } catch (Throwable e) { } }
|
我们看到其实这个方法除了Check和Apply就只有这一行subscribeActual(observer),连接了Observable和Observer。所以我们知道了,subscribeActual()方法里的代码,只有在subscribe()调用后,才回调用。
那么他们是如何链接的呢?其实很简单,根据你的逻辑一句一句的调用observer.onXX()方法就可以了。比如刚才我们看到的ObservableJust:
1 2 3 4 5 6 7 8 9 10
| @Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } }
|
再比如我们的ObservableFromArray:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } actual.onNext(value); } if (!isDisposed()) { actual.onComplete(); } }
|
复杂点的例子,比如如何封装button的OnClick事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Override protected void subscribeActual(Observer<? super Object> observer) { if (!checkMainThread(observer)) { return; } Listener listener = new Listener(view, observer); observer.onSubscribe(listener); view.setOnClickListener(listener); } static final class Listener extends MainThreadDisposable implements OnClickListener { private final View view; private final Observer<? super Object> observer; Listener(View view, Observer<? super Object> observer) { this.view = view; this.observer = observer; } @Override public void onClick(View v) { if (!isDisposed()) { observer.onNext(Notification.INSTANCE); } } @Override protected void onDispose() { view.setOnClickListener(null); } } }
|
但是细心的同学应该看到了,每个subscribeActual()方法里,都会有 observer.onSubscribe(disposable)这句。那么这句又是做什么的呢?
Disposable其实就是控制你取消订阅的。他只有两个方法 dispose() 取消订阅,和 isDisposed() 来通知是否已经取消了订阅。
取消订阅时,要根据需求释放资源。在subscribeActual()里逻辑要严谨,比如onComplete()之后不要有onNext()。需要注意的点很多,所以可能这也就是为什么RxJava推荐用户使用静态方法生成Observable吧。
最后再说一下几点:
- create()方法:其实create()方法相当于你把subscribeActual中的代码,写到了create里而已。所以有很高的操控性。
- Flowable:Floawble其实在实现上和Observable类似,区别是Observable同过 Disposable控制取消订阅。而Flowable同过Subscription。其中还需要request()方法控制流量。具体关于这个问题,我推荐这篇文章
给初学者的RxJava2.0教程
总结:
- 我们从源码分析角度来说,RxJava 2.x 也是同过subscribeActual来链接Observable和Observer(Subscriber)。本质上和Listener没什么太大区别。但是,RxJava的确是诸多一线Java/Android开发者的结晶。丰富的操作符,线程调度等等诸多优势。而且保证类型安全。
如果你还是停留在仅仅使用RxJava来实现一个RxBus,是不是有点杀鸡用牛刀呢?