目录
- 1. 前言
- 2. 正文
-
- 2.1 RxJava 里的观察者模式和普通的观察者模式有什么区别?
- 2.2 RxJava 观察者模式的订阅流程
- 2.3 RxJava 的 map 操作符的流程是什么样子的?
-
- 2.3.1 创建自定义`Observer` 对象
- 2.3.2 创建 `ObservableOnSubscribe` 对象
- 2.3.3 创建 `ObservableCreate` 对象
- 2.3.4 创建封装了变换操作的`Function` 对象
- 2.3.5 创建 `ObservableMap` 对象
- 2.3.6 调用 `ObservableMap` 的 `subscribe` 方法
- 2.3.7 创建 `MapObserver` 对象
- 2.3.8 调用 `ObservableCreate` 对象的 `subscribe` 方法
- 2.3.9 创建 `CreateEmitter` 对象
- 2.3.10 调用 `MapObserver` 对象的 `onSubscribe` 方法
- 2.3.11 `CreateEmitter` 对象调用 `onNext` 以及 `onComplete` 方法
- 2.3.12 `MapObserver` 对象调用 `onNext` 以及 `onComplete` 方法
- 2.4 `subscribeOn()` 流程分析
-
- 2.4.1 创建自定义`Observer` 对象
- 2.4.2 创建 `ObservableOnSubscribe` 对象
- 2.4.3 创建 `ObservableCreate` 对象
- 2.4.4 调用 `Schedulers.io()` 方法
- 2.4.5 调用 `ObservableCreate` 对象的 `subscribeOn`方法
- 2.4.6 调用 `ObservableSubscribeOn` 的 `subscribe` 方法,参数为自定义观察者
- 2.4.7 创建 `SubscribeOnObserver` 对象
- 2.4.8 创建 `SubscribeTask` 对象
- 2.4.9 调用 `IoScheduler` 单例对象的 `scheduleDirect()` 方法
- 2.4.10 调用 `IoScheduler` 单例对象的 `createWorker()` 方法
- 2.4.11 调用 `EventLoopWorker` 对象的 `schedule` 方法
- 2.4.12 调用 `ThreadWorker` 对象的 `scheduleActual` 方法
- 2.4.13 调用 `ScheduledRunnable` 对象的 `run()` 方法
- 2.4.14 调用 `DisposeTask` 对象的 `run()` 方法
- 2.4.15 调用 `SubscribeTask` 对象的 `run()` 方法
- 2.4.16 调用 `ObservableCreate` 对象的 `subscribe()` 方法
- 2.4.17 发射器 `CreateEmitter` 发送 `onNext` 以及 `onComplete` 事件
- 2.4.18 连续多次设置 `subscribeOn()`,哪一次生效?
- 2.5 `observeOn()` 流程分析
-
- 2.5.1 创建自定义 `Observer` 对象
- 2.5.2 创建 `ObservableOnSubscribe` 对象
- 2.5.3 创建 `ObservableCreate` 对象
- 2.5.4 调用 `AndroidSchedulers.mainThread()`方法
- 2.5.5 调用 `ObservableCreate` 对象的 `observeOn` 方法,参数 `HandlerScheduler` 对象
- 2.5.6 创建 `ObservableObserveOn` 对象
- 2.5.7 调用 `ObservableObserveOn` 对象的 `subscribe` 方法,参数为自定义观察者
- 2.5.8 调用 `HandlerScheduler` 对象的 `createWorker` 方法
- 2.5.9 创建 `ObserveOnObserver` 对象,参数自定义观察者,`HandlerWorker` 对象
- 2.5.10 调用 `ObservableCreate` 对象的 `subscribe` 方法,参数为 `ObserveOnObserver` 对象
- 2.5.11 创建发射器,参数为 `ObserveOnObserver` 对象
- 2.5.12 调用 `ObserveOnObserver` 对象的 `onSubscribe` 方法
- 2.5.13 调用发射器的`onNext`方法,值为`a`
- 2.5.14 `HandlerWorker`对象的`schedule`方法,参数为`ObserveOnObserver`对象
- 2.5.15 调用 `ScheduledRunnable` 对象的 `run` 方法
- 2.5.16 调用 `ObserveOnObserver` 对象的 `run` 方法
- 2.5.17 连续多次调用 `observeOn()` ,哪一次生效?
- 3. 最后
1. 前言
从 2016 年开始接触 RxJava 也有几年的时间了,但是并没有去研究过源码。再过几年,会不会还是这样?
最近从公司内部调用到新的工作岗位,工作上的事情比较烦。写下文章总结一下 RxJava,算是让自己静下来的方式吧。
本文针对的是 RxJava2.x 版本,依赖版本:
implementation 'io.reactivex.rxjava2:rxjava:2.2.20'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
会从几个方面进行分享:
- RxJava 里的观察者模式和普通的观察者模式有什么区别?
- RxJava 里的观察者模式源码流程
- RxJava 的 map 操作符的流程是什么样子的?
- RxJava 是如何进行线程切换的?
2. 正文
2.1 RxJava 里的观察者模式和普通的观察者模式有什么区别?
先看普通的观察者模式的例子,场景是学生和码农都需要关注鸿洋的公众号,这样他们才能接收到鸿洋的推文:
object NormalObserverPatternDemo {
const val TAG = "NormalObserverPattern"
fun show() {
// 1,创建观察者
val student = Student()
val engineer = Engineer()
// 2,创建被观察者
val hongyangOfficialAccount = HongyangOfficialAccount()
// 3,订阅
hongyangOfficialAccount.addObserver(student)
hongyangOfficialAccount.addObserver(engineer)
// 4,发送事件
hongyangOfficialAccount.notifyObservers("2021/6/19 推文:HarmonyOS 开发者日又双来了")
}
}
/** 抽象被观察者 */
interface Observable {
fun addObserver(observer: Observer)
fun removeObserver(observer: Observer)
fun notifyObservers(arg: Any)
}
/** 抽象观察者 */
interface Observer {
fun update(arg: Any)
}
/** 鸿洋公众号,作为具体被观察者 */
class HongyangOfficialAccount : Observable {
private val observers = mutableListOf<Observer>()
override fun addObserver(observer: Observer) {
if (!observers.contains(observer)) {
observers.add(observer)
}
}
override fun removeObserver(observer: Observer) {
if (observers.contains(observer)) {
observers.remove(observer)
}
}
override fun notifyObservers(arg: Any) {
for (observer in observers) {
observer.update(arg)
}
}
}
/** 学生,作为具体观察者 */
class Student : Observer {
override fun update(arg: Any) {
Log.d(TAG, "学生收到了推文:$arg,赶紧学习吧!")
}
}
/** 码农,作为具体观察者 */
class Engineer : Observer {
override fun update(arg: Any) {
Log.d(TAG, "码农收到了推文:$arg,浏览一下,有个印象吧!")
}
}
打印如下:
NormalObserverPattern: 学生收到了推文:2021/6/19 推文:HarmonyOS 开发者日又双来了,赶紧学习吧!
NormalObserverPattern: 码农收到了推文:2021/6/19 推文:HarmonyOS 开发者日又双来了,浏览一下,有个印象吧!
下面我们思考一下:
学生和码农不关注鸿洋的公众号时,鸿洋公众号还会不会发推文?
答:当然会了,因为他们不关注,洋神可以自己看啊。
接着,我们看 RxJava 的观察者模式的代码(这里我们只需要关注流程,先不去考虑源码实现,2.2 中会分析源码实现的):
// 1, 创建观察者
val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "onSubscribe: ")
}
override fun onNext(t: String) {
Log.d(TAG, "onNext: t=$t")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError: ", e)
}
override fun onComplete() {
Log.d(TAG, "onComplete: ")
}
}
// 2, 创建被观察者
val observable = Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
// 4, 发送事件
Log.d(TAG, "subscribe: onNext:发送值 a")
emitter.onNext("a")
Log.d(TAG, "subscribe: onNext:发送值 b")
emitter.onNext("b")
Log.d(TAG, "subscribe: onNext:发送值 c")
emitter.onNext("c")
Log.d(TAG, "subscribe: onNext:发送值 d")
emitter.onNext("d")
Log.d(TAG, "subscribe: onComplete")
emitter.onComplete()
}
})
// 3,订阅
observable.subscribe(observer)
打印如下:
D/RxJavaObserverPattern: onSubscribe:
D/RxJavaObserverPattern: subscribe: onNext:发送值 a
D/RxJavaObserverPattern: onNext: t=a
D/RxJavaObserverPattern: subscribe: onNext:发送值 b
D/RxJavaObserverPattern: onNext: t=b
D/RxJavaObserverPattern: subscribe: onNext:发送值 c
D/RxJavaObserverPattern: onNext: t=c
D/RxJavaObserverPattern: subscribe: onNext:发送值 d
D/RxJavaObserverPattern: onNext: t=d
D/RxJavaObserverPattern: subscribe: onComplete
D/RxJavaObserverPattern: onComplete:
可以看到,RxJava 的观察者模式流程和普通的观察者模式是一样的:
1,创建观察者;
2,创建被观察者;
3,订阅;
4,发送事件。
思考一下:
注释掉这行代码:observable.subscribe(observer)
,控制台的打印会是什么样子的?
答:没有任何打印。
小结一下:
类别 | 事件类型 | 订阅方式 | 不订阅是否会发送事件? | 谁发送事件? | 观察者与被观察者对应关系 |
---|---|---|---|---|---|
普通的观察者模式 | 只有普通事件 | addObserver() 或类似的方法 |
会 | 被观察者 | 多对一 |
RxJava的观察者模式 | 除了普通事件(onNext() ),还有两个特殊事件:(onComplete() 和 onError() ) |
subscribe() |
否 | 发射器 | 一对一 |
注意,表格里说 RxJava 的观察者模式不订阅不会发送事件并不严谨。实际上,要分情况说明:
- 热启动
Observable
任何时候都会发送消息,即使没有任何观察者监听它。 - 冷启动
Observable
只有在至少有一个订阅者的时候才会发送消息。
如果本文无特殊说明,Observable
均指的是冷启动 Observable
。
到这里,我们有几个疑问:
1,为什么 RxJava 观察者模式里不订阅(subscribe
)就不会发送事件呢?
2,RxJava 里发射器发送事件是如何到达观察者(Observer
)的回调方法里面的?
3,为什么发射器没有调用 onSubscribe
方法,自定义观察者却回调了 onSubscribe
方法呢?
接着往下看吧,我们一定要把这些问题搞明白了。
2.2 RxJava 观察者模式的订阅流程
针对 2.1 里面的 RxJava 观察者模式的代码,画出时序图如下:
看完时序图,不知道大家有没有得到问题的答案。
不过,不用担心,下面从源码上进行说明。现在就开始吧。
2.2.1 创建自定义 Observer
对象
Observer
是一个泛型接口,
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
通过创建一个实现自 Observer
的匿名类的对象,就得到自定义观察者对象了:
val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "onSubscribe: ")
}
override fun onNext(t: String) {
Log.d(TAG, "onNext: t=$t")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError: ", e)
}
override fun onComplete() {
Log.d(TAG, "onComplete: ")
}
}
这里传给类型参数T
的实际参数是 String
,这与发送的数据类型是一致的。
2.2.2 创建 ObservableOnSubscribe
对象
ObservableOnSubscribe
也是一个泛型接口,并且是一个函数式接口:
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
通过创建一个实现自 ObservableOnSubscribe
的匿名类的对象,作为传递给 Observable.create()
方法的参数:
object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
}
}
2.2.3 创建 ObservableCreate
对象
查看 Observable
的静态方法 create()
方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
这里我们不考虑 RxJavaPlugins.onAssembly()
方法会调用钩子函数,所以上面的方法可以写成这样子:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
这里返回的是一个 ObservableCreate
对象,而 ObservableCreate
是 Observable
的子类:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
可以看到,ObservableCreate
通过带参构造函数接收并持有了 ObservableOnSubscribe
对象。
val observableCreate = Observable.create(
object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
}
}
)
2.2.4 调用Observable.subscribe()
订阅方法
observableCreate.subscribe(observer)
同样地,我们把 subscribe()
方法简化如下:
public final void subscribe(Observer<? super T> observer) {
// 省略了校验函数和钩子函数
subscribeActual(observer);
}
在 Observable
类中,subscribeActual()
方法是一个抽象方法:
protected abstract void subscribeActual(Observer<? super T> observer);
那么,我们必须找到 subscribeActual()
方法的实现。
在上一步中,我们知道返回的Observable
对象就是一个 ObservableCreate
对象。因此,我们有理由认为 ObservableCreate
类里实现了 subscribeActual()
方法。
查看 ObservableCreate
类,果真有 subscribeActual()
方法的实现:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
在 subscribeActual()
方法里面:
首先创建了一个 CreateEmitter
对象,这是发射器对象;
然后调用了自定义观察者 observer
对象的 onSubscribe()
方法,表示通知观察者订阅成功了;
最后调用 ObservableOnSubscribe
对象的 subscribe()
方法,接收发射器对象。
2.2.5 创建 CreateEmitter
对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
这里把 CreateEmitter
进行简化处理:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
observer.onError(t);
return false;
}
@Override
public void onComplete() {
observer.onComplete();
}
// 省略一些与分析无关的代码
}
可以看到:
CreateEmitter
类通过构造函数接收了观察者对象并且持有了观察者对象;CreateEmitter
类中的onNext()
方法内部会调用观察者对象的onNext()
方法,onComplete()
方法内部会调用观察者对象的onComplete()
方法。
2.2.6 发送 onNext()
以及 onComplete()
事件
在 2.2.4 里,我们知道 ObservableOnSubscribe
对象的 subscribe()
方法,接收了发射器 CreateEmitter
对象;
在 2.2.5 中,我们知道 CreateEmitter
类通过构造函数接收了观察者对象并且持有了观察者对象。CreateEmitter
类中的 onNext()
方法内部会调用观察者对象的 onNext()
方法,onComplete()
方法内部会调用观察者对象的 onComplete()
方法。
因此,发射器发送的 onNext
事件会到达观察者的 onNext
回调, onComplete
事件会到达观察者的 onComplete
回调。
object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
Log.d(TAG, "subscribe: onNext:发送值 a")
emitter.onNext("a")
Log.d(TAG, "subscribe: onNext:发送值 b")
emitter.onNext("b")
Log.d(TAG, "subscribe: onNext:发送值 c")
emitter.onNext("c")
Log.d(TAG, "subscribe: onNext:发送值 d")
emitter.onNext("d")
Log.d(TAG, "subscribe: onComplete")
emitter.onComplete()
}
}
现在我们来回答 2.1 中提出的两个问题吧:
1,为什么 RxJava 观察者模式里不订阅(subscribe
)就不会发送事件呢?
因为只有在订阅时,才会创建发射器;而没有发射器,何谈发送事件呢。
2,RxJava 里发射器发送事件是如何到达自定义观察者(Observer
)的回调方法里面的?
发射器持有了自定义观察者对象,在调用发射器的
onNext
方法,内部会调用自定义观察者的onNext
方法;在调用发射器的onComplete
方法,内部会调用自定义观察者的onComplete
方法。这时候,发射器就委托观察者来处理事件了。
3,为什么发射器没有调用 onSubscribe
方法,自定义观察者却回调了 onSubscribe
方法呢?
2.2.4 中,在调用
ObservableCreate
对象的subscribeActual
方法中,调用了自定义观察者的onSubscribe
方法,表示通知观察者订阅成功了。
到这里,有些同学可能会想,为什么 RxJava 实现观察者模式采用的代码比普通的观察者要复杂那么多?
其实,我们目前演示的只是订阅流程,RxJava 强大的操作符还没有出现。
到这里,文章已经有一定的长度了。但是,大家一定要继续往下看,因为江湖传言:不懂操作符,不知 RxJava。
2.3 RxJava 的 map 操作符的流程是什么样子的?
打开 Observable
类浏览一下,可以看到有很多操作符:
- 创建操作符:
just
、interval
、create
、defer
等; - 变换操作符:
map
、flatMap
、concatMap
; - 合并操作符:
zip
、count
、concatArray
、mergeArrray
等; - 过滤操作符:
filter
、skip
、elementAt
、throttleFirst
等; - 条件操作符:
all
、takeWhile
、contains
、isEmpty
等; - do系列操作符:
doOnNext
、doOnSubscribe
、doOnComplete
等; - 其他操作符:
retry
、repeat
、delay
、timeout
等。
这么多操作符,该怎么掌握呢?
先掌握重点的,然后一个一个学习。
这里我们从 map
操作符入手,先看一个在之前的例子上使用 map
操作符的例子,实现的需求是依次发送 “a”
、 “b”
、 “c”
、 “d”
,观察者接收到的是 97
、98
、99
、100
,这里面就是使用 map
变换操作符来实现的。
// 1, 创建观察者
val observer = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "onSubscribe: ")
}
override fun onNext(t: Int) {
Log.d(TAG, "onNext: t=$t")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError: ", e)
}
override fun onComplete() {
Log.d(TAG, "onComplete: ")
}
}
// 2, 创建被观察者
val observableCreate = Observable.create(
object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
// 5, 发送事件
Log.d(TAG, "subscribe: onNext:发送值 a")
emitter.onNext("a")
Log.d(TAG, "subscribe: onNext:发送值 b")
emitter.onNext("b")
Log.d(TAG, "subscribe: onNext:发送值 c")
emitter.onNext("c")
Log.d(TAG, "subscribe: onNext:发送值 d")
emitter.onNext("d")
Log.d(TAG, "subscribe: onComplete")
emitter.onComplete()
}
}
)
// 3,使用 map 操作符对发送的数据进行变换
val observableMap = observableCreate.map(object : Function<String, Int> {
override fun apply(t: String): Int {
// 示取出索引为0的元素,数据类型是 Char,然后转换为 Int 类型。
return t[0].toInt()
}
})
// 4,订阅
observableMap.subscribe(observer)
代码改动的地方有:
1,观察者对象接收的数据类型由 String
改为 Int
;
2,添加使用 map
操作符:
// 3,使用 map 操作符对发送的数据进行变换
val observableMap = observableCreate.map(object : Function<String, Int> {
override fun apply(t: String): Int {
// 取出索引为0的元素,数据类型是 Char,然后转换为 Int 类型。
return t[0].toInt()
}
})
3,使用 observableMap
而非 observableCreate
来订阅:
observableMap.subscribe(observer)
运行一下,打印日志如下:
D/RxJavaMapOperator: onSubscribe:
D/RxJavaMapOperator: subscribe: onNext:发送值 a
D/RxJavaMapOperator: onNext: t=97
D/RxJavaMapOperator: subscribe: onNext:发送值 b
D/RxJavaMapOperator: onNext: t=98
D/RxJavaMapOperator: subscribe: onNext:发送值 c
D/RxJavaMapOperator: onNext: t=99
D/RxJavaMapOperator: subscribe: onNext:发送值 d
D/RxJavaMapOperator: onNext: t=100
D/RxJavaMapOperator: subscribe: onComplete
D/RxJavaMapOperator: onComplete:
查看日志打印,可以看到我们通过使用 map
操作符满足了需求。
代码已经了解了,现在我们看一下绘制好的 map 操作符时序图(点击图片可以查看大图):
这个图里边包含的内容确实有点多,看着可能会头晕。
不过,不要忘记了,我们只是在基本的订阅流程里,加了 map
操作符的使用而已。
在分析源码之前,还是先提几个问题:
map
操作符的变换方法调用发生在什么地方?- 代码里面哪些部分用到了装饰器模式?
2.3.1 创建自定义Observer
对象
参见 2.2.1 创建自定义 Observer
对象
2.3.2 创建 ObservableOnSubscribe
对象
参见 2.2.2 创建 ObservableOnSubscribe
对象
2.3.3 创建 ObservableCreate
对象
参见 2.2.3 创建 ObservableCreate
对象
2.3.4 创建封装了变换操作的Function
对象
先看一下 Function
接口,它是一个函数式接口,封装了 apply
方法,用于进行变换操作,即把输入值进行一些操作(值变换,类型变换)后返回:
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
这里所做的变换是把 String
类型的数据,如 "a"
,变换为 Int
类型的数据,如 97
:
object : Function<String, Int> {
override fun apply(t: String): Int {
// 取出索引为0的元素,数据类型是 Char,然后转换为 Int 类型。
return t[0].toInt()
}
}
2.3.5 创建 ObservableMap
对象
查看 Observable
类中的 map
方法:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
删繁就简为:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<T, R>(this, mapper);
}
这里返回的是 ObservableMap
对象,而ObservableMap
是 Observable
的子类:
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
// 省略了与本次分析无关的代码
}
可以看到,通过创建 ObservableMap
对象,让 ObservableMap
对象持有了 ObservableCreate
对象以及封装了变换操作的 Function
对象。
这里我们停下来,绘制一份类图如下:
从类图可以知道,这就是典型的装饰器模式了。
这里使用装饰器模式的好处是什么呢?
在不改变现有的 ObservableCreate
对象结构的基础上,通过具体装饰角色 ObservableMap
动态地给 ObservableCreate
对象(核心功能)添加了变换数据类型的功能;同时也实现了链式调用。
到这里配置的步骤已经完成了,下边开始进行订阅了。
2.3.6 调用 ObservableMap
的 subscribe
方法
内部实际调用的是 ObservableMap
的 subscribeActual
方法,传递的参数是观察者对象:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
// source 就是 ObservableCreate 对象
source.subscribe(new MapObserver<T, U>(t, function));
}
}
2.3.7 创建 MapObserver
对象
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
}
通过 MapObserver
的构造函数,MapObserver
对象持有了观察者对象和 Function
对象。
这里我们有必要再停一下,绘制一份类图如下:
这里同样使用了装饰器模式。
那么,这里使用装饰器模式的好处又是什么呢?
在不改变现有的自定义 Observer
对象结果的情况下,通过具体装饰角色 MapObserver
对象动态地自定义 Observer
对象(核心功能)添加一些职责(进行数据变换)。
2.3.8 调用 ObservableCreate
对象的 subscribe
方法
在 2.3.5 中,我们知道 ObservableMap
对象持有了 ObservableCreate
对象以及封装了变换操作的 Function
对象。
因此,下面代码里的 source
就是 ObservableCreate
对象:
source.subscribe(new MapObserver<T, U>(t, function))
内部实际调用的是 ObservableCreate
类的 subscribeActual
方法:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
传递给 subscribeActual
方法的实参是 MapObserver
对象。
在 subscribeActual
方法里:
首先创建了一个 CreateEmitter
对象,参数是 MapObserver
对象,这就创建了发射器对象;
然后调用了MapObserver
对象的 onSubscribe()
方法,表示通知观察者订阅成功了;
最后调用 ObservableOnSubscribe
对象的 subscribe()
方法,接收发射器对象。
2.3.9 创建 CreateEmitter
对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
这里把 CreateEmitter
进行简化处理:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
observer.onError(t);
return false;
}
@Override
public void onComplete() {
observer.onComplete();
}
// 省略一些与分析无关的代码
}
可以看到:
CreateEmitter
类通过构造函数接收了MapObserver
对象并且持有了MapObserver
对象;CreateEmitter
类中的onNext()
方法内部会调用MapObserver
对象的onNext()
方法,onComplete()
方法内部会调用MapObserver
对象的onComplete()
方法。
2.3.10 调用 MapObserver
对象的 onSubscribe
方法
observer.onSubscribe(parent);
其中,observer
是 MapObserver
对象,parent
是 CreateEmitter
对象。
调用在 MapObserver
的父类 BasicFuseableObserver
的 onSubscribe
方法:
public final void onSubscribe(Disposable d) {
// 删除与分析无关的代码
downstream.onSubscribe(this);
}
在 2.3.5 中我们知道,downstream
是观察者对象。所以,这里会调用观察者对象的 onSubscribe
方法。
2.3.11 CreateEmitter
对象调用 onNext
以及 onComplete
方法
在 2.3.9 中,我们知道:
CreateEmitter
类通过构造函数接收了MapObserver
对象并且持有了MapObserver
对象;CreateEmitter
类中的onNext()
方法内部会调用MapObserver
对象的onNext()
方法,onComplete()
方法内部会调用MapObserver
对象的onComplete()
方法。
2.3.12 MapObserver
对象调用 onNext
以及 onComplete
方法
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
protected final Observer<? super R> downstream;
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
@Override
public final void onSubscribe(Disposable d) {
downstream.onSubscribe(this);
}
@Override
public void onComplete() {
downstream.onComplete();
}
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
downstream.onNext(v);
}
}
MapObserver
类中的 onNext()
方法内部会先使用Function
对象进行变换操作,再调用自定义观察者对象的 onNext()
方法,onComplete()
方法内部会调用自定义观察者的 onComplete()
方法。
到这里,map
操作符的流程已经分析完毕了。
我们一起来回答本节开头提出的两个问题吧:
map
操作符的变换方法调用发生在什么地方?
答:在 2.3.12 里面,
MapObserver
对象的onNext
方法中发生调用的。
代码里面哪些部分用到了装饰器模式?
答:代码里面为了扩展
ObservableCreate
对象的功能时,用到了装饰器模式;为了扩展自定义Observer
对象的功能,再次用到了装饰器模式。
2.4 subscribeOn()
流程分析
扔物线大佬的文章中这样写:
RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。
首先,我们先引入一个小例子,演示 subscribeOn()
方法的使用:
对演示 RxJava 流程的代码进行修改,仅在发送事件之前增加休眠时间以及增加线程打印,代码如下:
// 1, 创建观察者
val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "onSubscribe: currThread=${
Thread.currentThread().name}")
}
override fun onNext(t: String) {
Log.d(TAG, "onNext: t=$t, currThread=${
Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError: ", e)
}
override fun onComplete() {
Log.d(TAG, "onComplete: currThread=${
Thread.currentThread().name}")
}
}
// 2, 创建被观察者
val observableCreate = Observable.create(
object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
// 4, 发送事件
TimeUnit.SECONDS.sleep(3L)
Log.d(TAG, "subscribe: onNext:发送值 a, currThread=${
Thread.currentThread().name}")
emitter.onNext("a")
TimeUnit.SECONDS.sleep(3L)
Log.d(TAG, "subscribe: onComplete, currThread=${
Thread.currentThread().name}")
emitter.onComplete()
}
}
)
// 3,订阅
observableCreate.subscribe(observer)
运行程序,打印日志如下:
D/RxJavaTimeConsuming: onSubscribe: currThread=main
D/RxJavaTimeConsuming: subscribe: onNext:发送值 a, currThread=main
D/RxJavaTimeConsuming: onNext: t=a, currThread=main
D/RxJavaTimeConsuming: subscribe: onComplete, currThread=main
D/RxJavaTimeConsuming: onComplete: currThread=main
大家看看日志,都是主线程打印的。没有问题,因为我们根本就没有新启线程,所以只能是主线程。
眼尖的你很快发现:在发送事件之前休眠了 3s,这好像不合理吧?
这都被你看出来,哈哈。我们希望发送事件的部分可以运行在子线程里面。这就要用到 subscribeOn()
方法了。
修改代码如下:
// 3,指定在 io 线程里发送事件
val observableSubscribeOn = observableCreate.subscribeOn(Schedulers.io())
// 4,订阅
observableSubscribeOn.subscribe(observer)
再次运行程序,打印日志如下:
D/RxJavaSubscribeOn: onSubscribe: currThread=main
D/RxJavaSubscribeOn: subscribe: onNext:发送值 a, currThread=RxCachedThreadScheduler-1
D/RxJavaSubscribeOn: onNext: t=a, currThread=RxCachedThreadScheduler-1
D/RxJavaSubscribeOn: subscribe: onComplete, currThread=RxCachedThreadScheduler-1
D/RxJavaSubscribeOn: onComplete: currThread=RxCachedThreadScheduler-1
我们再看一下日志,
- 自定义观察者的
onSubscribe
方法在主线程回调; - 发送事件的方法:
onNext
和onComplete
,是在子线程调用的; - 自定义观察者的
onNext
和onComplete
,是在子线程回调的(和发送事件的子线程是一样的)。
好了,这里同样产生了几个疑问:
- 为什么自定义观察者的
onSubscribe
方法在主线程回调,而不是在子线程? - 为什么发送事件的方法:
onNext
和onComplete
,是在子线程调用的?子线程是在哪里创建的? - 为什么自定义观察者的
onNext
和onComplete
,是在子线程回调的?为什么子线程和发送事件的子线程是一样的?
这里把上述代码的时序图(一定要点击放大查看)绘制如下:
现在我们开始看源码了,大家准备好了吗?
2.4.1 创建自定义Observer
对象
参见 2.2.1 创建自定义 Observer
对象
2.4.2 创建 ObservableOnSubscribe
对象
参见 2.2.2 创建 ObservableOnSubscribe
对象
2.4.3 创建 ObservableCreate
对象
参见 2.2.3 创建 ObservableCreate
对象
2.4.4 调用 Schedulers.io()
方法
这行代码返回的是 Scheduler
对象:
Schedulers.io()
查看 Schedulers
类中的 io()
方法:
// Schedulers 类中:
static final Scheduler IO;
static {
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
// RxJavaPlugins 类中:
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
化繁为简为:
// Schedulers 类中:
static final Scheduler IO;
static {
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public static Scheduler io() {
return IO;
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
// RxJavaPlugins 类中:
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
return defaultScheduler.call();
}
总结一下:
Schedulers.io()
就是创建了一个单例的 IoScheduler
对象。
2.4.5 调用 ObservableCreate
对象的 subscribeOn
方法
传递的实际参数为IoScheduler
单例对象
public final Observable<T> subscribeOn(Scheduler scheduler) {
// 这是简化后的代码
return new ObservableSubscribeOn<T>(this, scheduler);
}
在方法里面,创建了一个 ObservableSubscribeOn
对象:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}
通过 ObservableSubscribeOn
的构造方法,把 ObservableCreate
对象赋值给 source
变量,把 IoScheduler
单例对象赋值给 scheduler
变量。
2.4.6 调用 ObservableSubscribeOn
的 subscribe
方法,参数为自定义观察者
实际调用的是 ObservableSubscribeOn
的 subscribeActual
方法,传递的实参是自定义观察者。
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
在方法内部:
- 创建
SubscribeOnObserver
对象,参数是自定义观察者; - 调用自定义观察者的
onSubscribe
方法,自定义观察者的onSubscribe
方法会在当前线程回调; - 创建
SubscribeTask
对象,参数是SubscribeOnObserver
对象; - 调用
IoScheduler
单例对象的scheduleDirect
方法,参数是SubscribeTask
对象。
2.4.7 创建 SubscribeOnObserver
对象
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
}
通过构造方法,传递自定义观察者对象,赋值给 downstream
字段。
2.4.8 创建 SubscribeTask
对象
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask
实现了 Runnable
接口,因此它是一个自定义任务类。通过构造方法,传递 SubscribeOnObserver
对象,赋值给 parent
字段。
2.4.9 调用 IoScheduler
单例对象的 scheduleDirect()
方法
传递给 scheduleDirect
方法的实参是 SubscribeTask
对象。
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 2.4.10
final Worker w = createWorker();
final Runnable decoratedRun = run; // 此行去除了 hook 操作
DisposeTask task = new DisposeTask(decoratedRun, w);
// 2.4.11
w.schedule(task, delay, unit);
return task;
}
这里 DisposeTask
与 SubscribeTask
的关系是一种包装,绘制类图如下:
2.4.10 调用 IoScheduler
单例对象的 createWorker()
方法
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
}
createWorker
方法内部创建 EventLoopWorker
对象,并返回 EventLoopWorker
对象。
2.4.11 调用 EventLoopWorker
对象的 schedule
方法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
// 省略无关分析的代码
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
传递给形参 action
的实参是 DisposeTask
对象。
threadWorker
是 ThreadWorker
对象,是在创建 EventLoopWorker
时被初始化的。
2.4.12 调用 ThreadWorker
对象的 scheduleActual
方法
在 NewThreadWorker
类中查看:
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime,
@NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = run; // 去除了 hook 操作
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f = executor.submit((Callable<Object>)sr);
sr.setFuture(f);
return sr;
}
在方法内部,ScheduledRunnable
包装了 DisposeTask
,绘制类图如下:
Future<?> f = executor.submit((Callable<Object>)sr);
把 ScheduledRunnable
对象交给线程池执行,就会执行 ScheduledRunnable
对象的 run()
方法。
注意!注意!注意! 这里以后的代码都是运行在子线程里面了。
2.4.13 调用 ScheduledRunnable
对象的 run()
方法
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
final Runnable actual;
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;
this.lazySet(0, parent);
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
// actual 是 DisposeTask 对象
actual.run();
}
}
在 run()
方法里面,调用的是 DisposeTask
对象的 run()
方法。
2.4.14 调用 DisposeTask
对象的 run()
方法
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
final Runnable decoratedRun;
final Worker w;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
// decoratedRun 就是 SubscribeTask 对象
decoratedRun.run();
}
// 省略与分析无关的代码
}
2.4.15 调用 SubscribeTask
对象的 run()
方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// source 是 ObservableCreate 对象
source.subscribe(parent);
}
}
在 run()
方法内部,调用 ObservableCreate
对象的 subscribe()
方法,参数为 SubscribeOnObserver
对象。
2.4.16 调用 ObservableCreate
对象的 subscribe()
方法
实际调用的是 ObservableCreate
对象的 subscribeActual
方法,参数是 SubscribeOnObserver
对象。
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
在 subscribeActual()
方法里面:
首先创建了一个 CreateEmitter
对象,即发射器对象,传递参数是 SubscribeOnObserver
对象;
然后调用了SubscribeOnObserver
对象的 onSubscribe()
方法,但是并不会再回调给自定义观察者了;
最后调用 ObservableOnSubscribe
对象的 subscribe()
方法,接收发射器对象。
至此订阅过程就结束了,下边就开始发送事件了。
2.4.17 发射器 CreateEmitter
发送 onNext
以及 onComplete
事件
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// observer 是 SubscribeOnObserver 对象
observer.onNext(t);
}
@Override
public void onComplete() {
// observer 是 SubscribeOnObserver 对象
observer.onComplete();
}
}
SubscribeOnObserver
类:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
// downstream 是 自定义观察者对象
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
// downstream 是 自定义观察者对象
downstream.onComplete();
}
}
发射器调用 onNext
方法 -> 调用 SubscribeOnObserver
对象的 onNext
方法 -> 调用自定义观察者的 onNext
方法。
发射器调用 onComplete
方法 -> 调用 SubscribeOnObserver
对象的 onComplete
方法 -> 调用自定义观察者的 onComplete
方法。
在 2.4.12 已经切为子线程了,所以自定义观察者的 onNext
方法和 onComplete
方法都是在子线程调用的。
回答本节开头的几个疑问:
-
为什么自定义观察者的
onSubscribe
方法在主线程回调,而不是在子线程?因为调用自定义观察者的
onSubscribe
方法时,还没有进行线程切换,所以它是在当前线程回调的,当前线程是主线程。 -
为什么发送事件的方法:
onNext
和onComplete
,是在子线程调用的?子线程是在哪里创建的?在 2.4.12 已经切为子线程了,所以自定义观察者的
onNext
方法和onComplete
方法都是在子线程调用的。
子线程是在createWorker()
中,通过创建ThreadWorker
对象初始化了线程池。 -
为什么自定义观察者的
onNext
和onComplete
,是在子线程回调的?为什么子线程和发送事件的子线程是一样的?在 2.4.12 已经切为子线程了,之后没有进行过线程切换,所以自定义观察者的
onNext
和onComplete
,是在和发送事件的子线程是一样的子线程回调的。
2.4.18 连续多次设置 subscribeOn()
,哪一次生效?
调整代码如下:
val observableSubscribeOn1 = observableCreate.subscribeOn(Schedulers.io()) // RxCachedThreadScheduler
val observableSubscribeOn2 = observableSubscribeOn1.subscribeOn(Schedulers.computation()) // RxComputationThreadPool
val observableSubscribeOn3 = observableSubscribeOn2.subscribeOn(Schedulers.newThread()) // RxNewThreadScheduler
// 4,订阅
observableSubscribeOn3.subscribe(observer)
运行结果如下:
D/RxJavaSubscribeOn: onSubscribe: currThread=main
D/RxJavaSubscribeOn: subscribe: onNext:发送值 a, currThread=RxCachedThreadScheduler-1
D/RxJavaSubscribeOn: onNext: t=a, currThread=RxCachedThreadScheduler-1
D/RxJavaSubscribeOn: subscribe: onComplete, currThread=RxCachedThreadScheduler-1
D/RxJavaSubscribeOn: onComplete: currThread=RxCachedThreadScheduler-1
可以看到,只有第一次设置的 subscribeOn()
生效了。
那么,为什么只有第一次设置的 subscribeOn()
生效了呢?
答案在下面的时序图里面了:
2.5 observeOn()
流程分析
本节开头,我们仍是先设定一个场景,这样分析更加具体。
对于 2.4 中的例子,我们修改如下:
// 2, 创建被观察者
val observableCreate = Observable.create(
object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
// 5, 子线程发送事件
thread(name = "WorkThread") {
TimeUnit.SECONDS.sleep(3L)
Log.d(TAG, "subscribe: onNext:发送值 a, currThread=${Thread.currentThread().name}")
emitter.onNext("a")
TimeUnit.SECONDS.sleep(3L)
Log.d(TAG, "subscribe: onComplete, currThread=${Thread.currentThread().name}")
emitter.onComplete()
}
}
}
)
// 3,指定在 main 线程里观察事件
val observableObserveOn= observableCreate.observeOn(AndroidSchedulers.mainThread())
// 4,订阅
observableObserveOn.subscribe(observer)
说明一下这里的改动:
- 在线程名字为
WorkerThread
的子线程里发送事件。这里其实可以用 2.4 中的subscribeOn()
方法来实现,但是为了使大家专注于分析observeOn()
,这里直接创建了一个子线程。 observableCreate.observeOn(AndroidSchedulers.mainThread())
,表示让自定义观察者在主线程观察事件。
好了,运行一下程序,看一下日志吧:
D/RxJavaObserveOnDemo: onSubscribe: currThread=main
D/RxJavaObserveOnDemo: subscribe: onNext:发送值 a, currThread=WorkThread
D/RxJavaObserveOnDemo: onNext: t=a, currThread=main
D/RxJavaObserveOnDemo: subscribe: onComplete, currThread=WorkThread
D/RxJavaObserveOnDemo: onComplete: currThread=main
可以看到,日志打印与我们的改动目的完全相符。
在继续分析之前,提出一个疑问:
从子线程切到主线程是如何完成的?
绘制时序图如下:
2.5.1 创建自定义 Observer
对象
见 2.2.1 创建自定义 Observer
对象
2.5.2 创建 ObservableOnSubscribe
对象
见 2.2.2 创建 ObservableOnSubscribe
对象
2.5.3 创建 ObservableCreate
对象
见 2.2.3 创建 ObservableCreate
对象
2.5.4 调用 AndroidSchedulers.mainThread()
方法
看下面的源码:
// 在 AndroidSchedulers 类中:
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
// 在 RxAndroidPlugins 类中:
public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Scheduler, Scheduler> f = onMainThreadHandler;
if (f == null) {
return scheduler;
}
return apply(f, scheduler);
}
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
if (f == null) {
return callRequireNonNull(scheduler);
}
return applyRequireNonNull(f, scheduler);
}
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
Scheduler scheduler = s.call();
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
去掉 hook 操作,化繁为简为:
// 在 AndroidSchedulers 类中:
public static Scheduler mainThread() {
return MAIN_THREAD;
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
// 在 RxAndroidPlugins 类中:
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
return scheduler.call();
总结一下:
AndroidSchedulers.mainThread()
返回的是一个单例的 HandlerScheduler
对象,这个对象持有一个可以切换到主线程的 Handler
对象。
2.5.5 调用 ObservableCreate
对象的 observeOn
方法,参数 HandlerScheduler
对象
在 Observable
类中:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
// 这里去掉了 hook 操作
return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}
可以看到,在 observeOn
方法内部,创建了一个 ObservableObserveOn
对象;并且,通过 ObservableObserveOn
的构造方法,使得 ObservableObserveOn
对象持有了 ObservableCreate
对象HandlerScheduler
对象;最后返回 ObservableObserveOn
对象。
2.5.6 创建 ObservableObserveOn
对象
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
// 把 ObservableCreate 对象设置给 source 字段
super(source);
// 把 HandlerScheduler 对象设置给 scheduler 字段
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
2.5.7 调用 ObservableObserveOn
对象的 subscribe
方法,参数为自定义观察者
实际调用的是 ObservableObserveOn
类中的 subscribeActual
方法,参数为自定义观察者:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
// scheduler 是 HandlerScheduler 对象
Scheduler.Worker w = scheduler.createWorker();
// source 是 ObservableCreate 对象
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
2.5.8 调用 HandlerScheduler
对象的 createWorker
方法
在 HandlerScheduler
中:
public Worker createWorker() {
// handler 就是一个可以切换到主线程的 `Handler` 对象
return new HandlerWorker(handler, async);
}
HandlerWorker
是继承于 Worker
的一个类。
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
}
2.5.9 创建 ObserveOnObserver
对象,参数自定义观察者,HandlerWorker
对象
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> downstream;
final Scheduler.Worker worker;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
}
2.5.10 调用 ObservableCreate
对象的 subscribe
方法,参数为 ObserveOnObserver
对象
分析这行代码:
// source 是 ObservableCreate 对象
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
实际调用的是 ObservableCreate
的 subscribeActual
方法。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}
在 subscribeActual()
方法里面:
首先创建了一个 CreateEmitter
对象,即发射器对象,并把 ObserveOnObserver
对象通过构造方法传递给发射器对象;
然后调用了ObserveOnObserver
对象的 onSubscribe()
方法,表示通知观察者订阅成功了;
最后调用 ObservableOnSubscribe
对象的 subscribe()
方法,接收发射器对象。
2.5.11 创建发射器,参数为 ObserveOnObserver
对象
这里我们直接把 CreateEmitter
化繁为简,关注主线流程:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
// 把 ObserveOnObserver 对象设置给 observer 字段
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
2.5.12 调用 ObserveOnObserver
对象的 onSubscribe
方法
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> downstream;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
}
@Override
public void onSubscribe(Disposable d) {
// 初始化 SimpleQueue 对象,用于存放 onNext 方法发送的值
queue = new SpscLinkedArrayQueue<T>(bufferSize);
// downstream 是自定义观察者对象
downstream.onSubscribe(this);
}
}
}
可以看到,在方法内部调用了自定义观察者的 onSubscribe
方法,这样自定义观察者就收到了订阅成功的回调了。
2.5.13 调用发射器的onNext
方法,值为a
在发射器的 onNext
方法里会调用 ObserveOnObserver
对象的 onNext
方法。
看一下 ObserveOnObserver
的 onNext
方法:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> downstream;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Disposable upstream;
volatile boolean done;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
// 自定义观察者
this.downstream = actual;
// HandlerWorker 对象
this.worker = worker;
}
@Override
public void onSubscribe(Disposable d) {
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
@Override
public void onNext(T t) {
queue.offer(t);
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
void schedule() {
worker.schedule(this);
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
// 从队列里取出元素
T v = q.poll();
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
// 发送 onNext 事件
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
drainNormal();
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
return false;
}
}
可以看到在 onNext
方法内部:
先调用 queue.offer(t);
把要发送的值放在队列里面;
然后调用 schedule
方法,内部调用了 HandlerWorker
对象的 schedule
方法,传递的参数是 ObserveOnObserver
对象(它实现了 Runnable
接口,所以可以直接传递)。
2.5.14 HandlerWorker
对象的schedule
方法,参数为ObserveOnObserver
对象
在 HandlerWorker
的 schedule
方法里:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
- 首先,创建了一个
ScheduledRunnable
对象,持有Handler
对象以及ObserveOnObserver
对象; - 然后,把
ScheduledRunnable
包装在一个Message
里面; - 最后,使用
Handler
对象发送消息,使ScheduledRunnable
对象在主线程执行。
2.5.15 调用 ScheduledRunnable
对象的 run
方法
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
// delegate 就是 ObserveOnObserver 对象
this.delegate = delegate;
}
@Override
public void run() {
try {
// 调用 ObserveOnObserver 对象的 run 方法
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
}
2.5.16 调用 ObserveOnObserver
对象的 run
方法
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> downstream;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Disposable upstream;
volatile boolean done;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
// 自定义观察者
this.downstream = actual;
// HandlerWorker 对象
this.worker = worker;
}
@Override
public void onSubscribe(Disposable d) {
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
@Override
public void onNext(T t) {
queue.offer(t);
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
void schedule() {
worker.schedule(this);
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
// 从队列里取出元素
T v = q.poll();
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
// 发送 onNext 事件
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
drainNormal();
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
return false;
}
}
在 run
方法内部调用 drainNormal
方法;
从队列中取出发送的值,调用自定义观察者的 onNext
方法。
2.5.17 连续多次调用 observeOn()
,哪一次生效?
对于本节开头的例子,修改如下:
val observableObserveOn1 = observableCreate.observeOn(AndroidSchedulers.mainThread()) // main
val observableObserveOn2 = observableObserveOn1.observeOn(Schedulers.io()) // RxCachedThreadScheduler
val observableObserveOn3 = observableObserveOn2.observeOn(Schedulers.newThread()) // RxNewThreadScheduler
// 4,订阅
observableObserveOn3.subscribe(observer)
运行结果如下:
D/MultipleObserveOn: onSubscribe: currThread=main
D/MultipleObserveOn: subscribe: onNext:发送值 a, currThread=WorkThread
D/MultipleObserveOn: onNext: t=a, currThread=RxNewThreadScheduler-1
D/MultipleObserveOn: subscribe: onComplete, currThread=WorkThread
D/MultipleObserveOn: onComplete: currThread=RxNewThreadScheduler-1
可以看到,只有最后一次 observeOn()
生效了。
这是为什么呢?
查看下面的时序图:
3. 最后
本文主要采用时序图的方式展示了 RxJava 的订阅流程,map变化操作符流程,线程切换流程;并用类图展示了 RxJava 里使用到的包装器模式。
虽然本文已经够长了,但是还有一些内容没有包括进去:
- RxJava 中的 hook 是做什么用的?
- RxJava 在工作中的最佳实践
- 工作中使用 RxJava 遇到的问题及解决办法。
- 自定义操作符
希望回头有时间可以补完。
代码、时序图以及类图源文件在这里。
参考
- 给 Android 开发者的 RxJava 详解
- Android主流三方库源码分析(五、深入理解RxJava源码)
- 详解 RxJava 的消息订阅和线程切换原理
- RxJava2.0——从放弃到入门
- RxJava入门之路(一)
- 这可能是最好的RxJava 2.x 教程(完结版)
- 反应式编程 RxJava 设计原理解析
- RxJava——扩展的观察者模式
- RxJava2源码分析-类的命名规则
- RxJava入门之介绍与基本运用
- RxJava入门
- 变种装饰器模式在rxjava中的应用
- Android:RxJava 3.0尝鲜,你做好准备了吗?
- 彻底搞清楚 RxJava 是什么东西
- 你会在实际工作中使用 rxjava 吗?
- RxJava 这8问,你顶得住吗?
- RxJava系列教程
- RxJava
- ReactiveX
文章评论