RxJava最迷人的是什么?
答案就是把异步序列写到一个工作流里!javascriptPromise/A如出一辙。
OK,在java中做异步的事情在我们传统理解过来可不方便,而且,如果要让异步按照我们的工作流来,就更困难了。

但是在RxJava中,我们只要调用调用
subscribOn()observeOn()就能切换我们的工作线程,是不是让小伙伴都惊呆了?

然后结合RxJavaOperator,写异步的时候,想切换线程就是一行代码的事情,整个workflow还非常清晰:

Observable.create()
// do something on io thread
.work() // work.. work..
.subscribeOn(Schedulers.io())
// observeOn android main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe();

我们再也不用去写什么见鬼的new ThreadHandler了,在这么几行代码里,我们实现了在io线程上做我们的工作(work),在main线程上,更新UI

Subscribe On

先看下subscribeOn干了什么

 public final Observable<T> subscribeOn(Scheduler scheduler) {if (this instanceof ScalarSynchronousObservable) {return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);}return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

啊,原来也是个lift,就是从一个Observable生成另外一个Observable咯,这个nest是干嘛用?

 public final Observable<Observable<T>> nest() {return just(this);
}

这里返回类型告诉我们,它是产生一个Observable<Observable<T>>
讲到这里,会有点晕,先记着这个,然后我们看OperatorSubscribeOn这个操作符,

构造函数是

public OperatorSubscribeOn(Scheduler scheduler) {this.scheduler = scheduler;
}

OK,这里保存了scheduler对象,然后就是我们前一章说过的转换方法。

 @Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {final Worker inner = scheduler.createWorker();subscriber.add(inner);return new Subscriber<Observable<T>>(subscriber) {@Overridepublic void onCompleted() {// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(final Observable<T> o) {inner.schedule(new Action0() {@Overridepublic void call() {final Thread t = Thread.currentThread();o.unsafeSubscribe(new Subscriber<T>(subscriber) {@Overridepublic void onCompleted() {subscriber.onCompleted();}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(T t) {subscriber.onNext(t);}@Overridepublic void setProducer(final Producer producer) {subscriber.setProducer(new Producer() {@Overridepublic void request(final long n) {if (Thread.currentThread() == t) {// don't schedule if we're already on the thread (primarily for first setProducer call)// see unit test 'testSetProducerSynchronousRequest' for more context on thisproducer.request(n);} else {inner.schedule(new Action0() {@Overridepublic void call() {producer.request(n);}});}}});}});}});}};
}

让人纠结的类模板

看完这段又臭又长的,先深呼吸一口气,我们慢慢分析下。
首先要注意RxJava里面最让人头疼的模板问题,那么OperatorMap这个类的声明是

public final class OperatorMap<T, R> implements Operator<R, T>

Operator这个接口继承Func1

public interface Func1<T, R> extends Function {R call(T t);
}

我们这里不要记TR,记住传入左边的模板是形参,传入右边的模板是返回值

好了,那么这里的call就是从一个T转换成一个Observable<T>的过程了。

总结一下,我们这一次调用subscribeOn,做了两件事

1、nest() 为Observable<T>生成了一个Observable<Observable<T>> 
2、lift() 对Observalbe<Observalbe<T>>进行一个变化,变回Observable<T>

因为lift是一个模板函数,它的返回值的类型是参照它的形参来,而他的形参是Operator<T, Observable<T>> 这个结论非常重要!!
OK,到这里我们已经存储了所有的序列,等着我们调用了。

调用链

首先,记录我们在调用这条指令之前的Observable<T>,记为Observable$1
然后,经过lift生成的Observable<T>记为Observable$2

好了,现在我们拿到的依然是Observable<T>这个对象,但是它不是原始的Observable$1,要深深记住这一点,它是由lift生成的Observable$2,这时候进行subscribe,那看到首先调用的就是OnSubscribe.call方法,好,直接进入lift当中生成的那个地方。

我们知道这一层liftoperator就是刚刚的OperatorSubscribOn,那么调用它的call方法,生成的是一个Subscriber<Observable<T>>

Subscriber<? super T> st = hook.onLift(operator).call(o);
try {// new Subscriber created and being subscribed with so 'onStart' itst.onStart();onSubscribe.call(st);
} catch (Throwable e) {
...
}

好,还记得我们调用过nest么?,这里的onSubscribe可是nest上下文中的噢,每一次,到这个地方,这个onSubscribe就是上一层ObservableonSubscribe,即Observable<Observable<T>>onSubscribe,相当于栈弹出了一层。它的call直接在SubscriberonNext中给出了最开始的Observable<T>,我们这里就要看下刚刚在OperatorSubscribeOn中生成的Subscriber

new Subscriber<Observable<T>>(subscriber) {@Overridepublic void onCompleted() {// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(final Observable<T> o) {inner.schedule(new Action0() {@Overridepublic void call() {final Thread t = Thread.currentThread();o.unsafeSubscribe(new Subscriber<T>(subscriber) {@Overridepublic void onCompleted() {subscriber.onCompleted();}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(T t) {subscriber.onNext(t);}});}});}
}

对,就是它,这里要注意,这里的subscriber就是我们在lift中,传入的o

Subscriber<? super T> st = hook.onLift(operator).call(o);

对,就是它,其实它就是SafeSubscriber

回过头,看看刚刚的onNext()方法,inner.schedule() 这个函数,我们可以认为就是postRun()类似的方法,而onNext()中传入的o是我们之前生成的Observable$1,是从Observable.just封装出来的Observable<Observable<T>>中产生的,这里调用了Observable$1.unsafeSubscribe方法,我们暂时不关心它和subscribe有什么不同,但是我们知道最终功能是一样的就好了。

注意它运行时的线程!!在inner这个Worker上!于是它的运行线程已经被改了!!

好,这里的unsafeSubscribe调用的方法就是调用原先Observable$1.onSubscribe中的call方法:
这个Observable$1就是我们之前自己定义的Observable了。

综上所述,如果我们需要我们的Observable$1在一个别的线程上运行的时候,只需要在后面跟一个subscribeOn即可。结合扔物线大大的图如下:

总结

这里逻辑着实不好理解。如果还没有理解的朋友,可以按照我前文说的顺序,细致的看下来,我把逻辑过一遍之后,发现lift的陷阱实在太大,内部类用的风生水起,一不小心,就不知道一个变量的上下文是什么,需要特别小心。

之前我们分析过subscribeOn这个函数,
现在我们来看下subscribeOnobserveOn这两个函数到底有什么异同。

用过rxjava的旁友都知道,subscribeOnobserveOn都是用来切换线程用的,可是我什么时候用subscribeOn,什么时候用observeOn呢,我们很少知道这两个区别是啥。

友情提示,如果不想看分析过程的,可以直接跳到下面的总结部分。

subscribeOn

先看下OperatorSubscribeOn的核心代码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {final Scheduler scheduler;final Observable<T> source;public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {this.scheduler = scheduler;this.source = source;}@Overridepublic void call(final Subscriber<? super T> subscriber) {final Worker inner = scheduler.createWorker();subscriber.add(inner);inner.schedule(new Action0() {@Overridepublic void call() {Subscriber<T> s = new Subscriber<T>(subscriber) {@Overridepublic void onNext(T t) {subscriber.onNext(t);}@Overridepublic void onError(Throwable e) {try {subscriber.onError(e);} finally {inner.unsubscribe();}}@Overridepublic void onCompleted() {try {subscriber.onCompleted();} finally {inner.unsubscribe();}}....};source.unsafeSubscribe(s);}});}
}

这里注意两点:

  1. 因为OperatorSubscribeOn是个OnSubscribe对象,所以在call参数中传入的subscriber就是我们在外面使用Observable.subscribe(a)传入的对象a

  2. 这里source对象指向的是调用subscribeOn之前的那个Observable序列。

明确了这两点,我们就很好的知道了subscribeOn是如何工作,产生神奇的效果了。
其实最最主要的就是一行函数

source.unsafeSubscribe(s);

并且要注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:

subscribeOn的调用,改变了调用前序列所运行的线程。

observeOn

同样看下OperatorObserveOn这个类的主要代码:

public final class OperatorObserveOn<T> implements Operator<T, T> {private final Scheduler scheduler;private final boolean delayError;/*** @param scheduler the scheduler to use* @param delayError delay errors until all normal events are emitted in the other thread?*/public OperatorObserveOn(Scheduler scheduler, boolean delayError) {this.scheduler = scheduler;this.delayError = delayError;}@Overridepublic Subscriber<? super T> call(Subscriber<? super T> child) {....ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);parent.init();return parent;}/** Observe through individual queue per observer. */private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {final Subscriber<? super T> child;final Scheduler.Worker recursiveScheduler;final NotificationLite<T> on;final boolean delayError;final Queue<Object> queue;// the status of the current streamvolatile boolean finished;final AtomicLong requested = new AtomicLong();final AtomicLong counter = new AtomicLong();/** * The single exception if not null, should be written before setting finished (release) and read after* reading finished (acquire).*/Throwable error;// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should// not prevent anything downstream from consuming, which will happen if the Subscription is chainedpublic ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {this.child = child;this.recursiveScheduler = scheduler.createWorker();this.delayError = delayError;this.on = NotificationLite.instance();if (UnsafeAccess.isUnsafeAvailable()) {queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);} else {queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);}}void init() {// don't want this code in the constructor because `this` can escape through the // setProducer callSubscriber<? super T> localChild = child;localChild.setProducer(new Producer() {@Overridepublic void request(long n) {if (n > 0L) {BackpressureUtils.getAndAddRequest(requested, n);schedule();}}});localChild.add(recursiveScheduler);localChild.add(this);}@Overridepublic void onStart() {// signal that this is an async operator capable of receiving this manyrequest(RxRingBuffer.SIZE);}@Overridepublic void onNext(final T t) {if (isUnsubscribed() || finished) {return;}if (!queue.offer(on.next(t))) {onError(new MissingBackpressureException());return;}schedule();}@Overridepublic void onCompleted() {if (isUnsubscribed() || finished) {return;}finished = true;schedule();}@Overridepublic void onError(final Throwable e) {if (isUnsubscribed() || finished) {RxJavaPlugins.getInstance().getErrorHandler().handleError(e);return;}error = e;finished = true;schedule();}protected void schedule() {if (counter.getAndIncrement() == 0) {recursiveScheduler.schedule(this);}}// only execute this from schedule()@Overridepublic void call() {long emitted = 0L;long missed = 1L;// these are accessed in a tight loop around atomics so// loading them into local variables avoids the mandatory re-reading// of the constant fieldsfinal Queue<Object> q = this.queue;final Subscriber<? super T> localChild = this.child;final NotificationLite<T> localOn = this.on;// requested and counter are not included to avoid JIT issues with register spilling// and their access is is amortized because they are part of the outer loop which runs// less frequently (usually after each RxRingBuffer.SIZE elements)for (;;) {long requestAmount = requested.get();long currentEmission = 0L;while (requestAmount != currentEmission) {boolean done = finished;Object v = q.poll();boolean empty = v == null;if (checkTerminated(done, empty, localChild, q)) {return;}if (empty) {break;}localChild.onNext(localOn.getValue(v));currentEmission++;emitted++;}if (requestAmount == currentEmission) {if (checkTerminated(finished, q.isEmpty(), localChild, q)) {return;}}if (currentEmission != 0L) {BackpressureUtils.produced(requested, currentEmission);}missed = counter.addAndGet(-missed);if (missed == 0L) {break;}}if (emitted != 0L) {request(emitted);}}boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {if (a.isUnsubscribed()) {q.clear();return true;}if (done) {if (delayError) {if (isEmpty) {Throwable e = error;try {if (e != null) {a.onError(e);} else {a.onCompleted();}} finally {recursiveScheduler.unsubscribe();}}} else {Throwable e = error;if (e != null) {q.clear();try {a.onError(e);} finally {recursiveScheduler.unsubscribe();}return true;} elseif (isEmpty) {try {a.onCompleted();} finally {recursiveScheduler.unsubscribe();}return true;}}}return false;}}
}

这里的代码有点长,我们先注意到它是一个Operator,它没有对上层Observable做任何的控制或者包装。

既然是Operator,那么它的职责就是把一个Subscriber转换成另外一个Subscriber, 我们来关注下转换后的Subscriber对转换前的Subscriber做了些什么事。

首先它是一个ObserveOnSubscriber类, 既然是Subscriber那么肯定有onNextonComplete 和onError 看最主要的onNext

@Override
public void onNext(final T t) {if (isUnsubscribed() || finished) {return;}if (!queue.offer(on.next(t))) {onError(new MissingBackpressureException());return;}schedule();
}

好了,这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker

我们这里需要注意下:

在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。

protected void schedule() {if (counter.getAndIncrement() == 0) {recursiveScheduler.schedule(this);}
}

recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()对吧、

接下去,我们看下在scheduler中调用的call方法,这里只列出主要带代码

@Override
public void call() {...final Subscriber<? super T> localChild = this.child;for (;;) {...boolean done = finished;Object v = q.poll();boolean empty = v == null;if (checkTerminated(done, empty, localChild, q)) {return;}if (empty) {break;}localChild.onNext(localOn.getValue(v));...}if (emitted != 0L) {request(emitted);}
}

OK,在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。

那么总结起来的结论就是:

  1. observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上

  2. observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

复杂情况

我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOnsubscribeOn达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

总结

如果我们有一段这样的序列

Observable
.map                    // 操作1
.flatMap                // 操作2
.subscribeOn(io)
.map                    //操作3
.flatMap                //操作4
.observeOn(main)
.map                    //操作5
.flatMap                //操作6
.subscribeOn(io)        //!!特别注意
.subscribe(handleData)

假设这里我们是在主线程上调用这段代码,
那么

  1. 操作1操作2是在io线程上,因为之后subscribeOn切换了线程

  2. 操作3操作4也是在io线程上,因为在subscribeOn切换了线程之后,并没有发生改变。

  3. 操作5操作6是在main线程上,因为在他们之前的observeOn切换了线程。

  4. 特别注意那一段,对于操作5操作6是无效的

再简单点总结就是

  1. subscribeOn的调用切换之前的线程。

  2. observeOn的调用切换之后的线程。

  3. observeOn之后,不可再调用subscribeOn 切换线程

=========
续 特别感谢@扔物线给的额外的总结

  1. 下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件

  2. 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 毛意义)

  3. 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn()

  4. observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作

  5. 不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程

迷之RxJava —— 线程切换相关推荐

  1. RxJava 线程切换

    前言 在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程.如果觉得源码枯燥可以直接移至文末看图理解. 实例代码 Observable.create(new Observ ...

  2. android rxjava 多线程,你真的了解RxJava的线程切换吗?

    使用RxJava可以轻松地实现线程切换,所以在Android中常被用来替代AsyncTask.Handler等原生工具类.使用起来虽然简单,但如果不了解其背后的基本原理,很可能因为使用不当而写出bug ...

  3. 线程切换哪家强?RxJava与Flow的操作符对比

    作者:fundroid 链接:https://juejin.cn/post/6943037393893064734#heading-13 Flow作为Coroutine版的RxJava,同RxJava ...

  4. java子线程切换到主线程_Android子线程切换到UI线程方法总结

    线程切换 通过消息发送(发布)和接收(订阅)的方式切换的. 1 .Handler 子线程(非UI线程)调用handler对象sendMessage(msg)方法,将消息发送给关联Looper,Loop ...

  5. Rxjava2原理流程+操作符+线程切换 浅析~

    0.前言 没拜读过强大的代码就建议去稍微看一下rxjava2的原理,并不难懂.写的非常的好,也能领略到大佬写的代码有多么的强.里面的设计模式真的牛逼 1.Rxjava2 Rxjava2用于我们来做响应 ...

  6. 理解RxJava线程模型

    RxJava作为目前一款超火的框架,它便捷的线程切换一直被人们津津乐道,本文从源码的角度,来对RxJava的线程模型做一次深入理解.(注:本文的多处代码都并非原本的RxJava的源码,而是用来说明逻辑 ...

  7. 理解 RxJava 线程模型

    http://blog.saymagic.cn/2016/08/20/understand-rxjava-threading-model.html 主题 线程 RxJava RxJava作为目前一款超 ...

  8. 应用退出前不让线程切换_用户级线程和内核级线程,你分清楚了吗?

    前天晚上有个伙伴私信我说在学进程和线程,问我有没有好的方法和学习教程,刚好我最近也在备相关的课. 班上不少学生学的还是很不错的.拿班上小白和小明的例子吧(艺名哈).小明接受能力很强,小白则稍差些. 关 ...

  9. 模拟线程切换 C++

    为什么80%的码农都做不了架构师?>>>    前言: 本文主要是剖析NachOs的线程切换原理,并通过一个简化的例子(就是将线程部分代码抽取出来再加以修改) 来说明.本文 gith ...

最新文章

  1. 英特尔携手中科院计算所建立中国首个 oneAPI 卓越中心
  2. 2020年,这些学者归国任教
  3. 非交互模式修改Ubuntu密码的命令
  4. 学python要多少钱-python学习费用多少合适
  5. rssi室内定位算法原理_三分钟看懂蓝牙室内定位 值得分享
  6. 实例32:python
  7. Git下使用Beyond Compare作为比较和合并工具
  8. 进入大厂的面试经验详细总结(P7 拿 offer)
  9. Install Kernel 3.10 on CentOS 6.5
  10. java 利用同步工具类控制线程
  11. java,python,scala发送http请求
  12. linux vim创建文件配置文件,vim linux 强大的配置文件
  13. 必须收藏!130 个相见恨晚的超实用网站,一次性分享出来
  14. 分享个网站首页弹窗代码
  15. ChucK学习笔记(零)——前言
  16. Netty实战二-实现UDP的单播和广播
  17. 软件工程作业一:从产品经理人角度分析微信求职招聘小程序
  18. RCWL-0516微波雷达模块检测人体移动(发光二极管)
  19. Python之文件处理-JSON文件
  20. matlab中buttord用法_matlab butter函数

热门文章

  1. android paint 字体,Android绘图之Paint的使用方法详解
  2. MultipartFile与File的互转
  3. 1.3. 一只大象口渴了,要喝20升水才能解渴,但现在只有一个深h厘米,底面半径为r厘米的小圆桶(h和r都是整数)。问大象至少要喝多少桶水才会解渴。编写程序输入半径和高度,输出需要的桶数(一定是整数)
  4. AssetManager
  5. 前端开发常用到的flex布局简单范例
  6. 华为S5720s配置dns解析使用ftp备份
  7. 常见的Linux操作系统
  8. OpenGL ES之glOrtho函数
  9. Python3 PNG文件格式及根据CRC检验码修复图片高度
  10. springBoot下java代码mysql数据库定时任务(创建表)