迷之RxJava —— 线程切换
RxJava
最迷人的是什么?
答案就是把异步序列写到一个工作流里!
和javascript
的Promise/A
如出一辙。
OK,在java
中做异步的事情在我们传统理解过来可不方便,而且,如果要让异步按照我们的工作流来,就更困难了。
但是在RxJava
中,我们只要调用调用
subscribOn()
和observeOn()
就能切换我们的工作线程,是不是让小伙伴都惊呆了?
然后结合RxJava
的Operator
,写异步的时候,想切换线程就是一行代码的事情,整个workflow
还非常清晰:
Observable.create()
// do something on io thread
.work() // work.. work..
.subscribeOn(Schedulers.io())
// observeOn android main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
我们再也不用去写什么见鬼的new Thread
和Handler
了,在这么几行代码里,我们实现了在io
线程上做我们的工作(work
),在main
线程上,更新UI
Subscribe On
先看下subscribeOn
干了什么
public final Observable<T> subscribeOn(Scheduler scheduler) {if (this instanceof ScalarSynchronousObservable) {return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);}return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}
啊,原来也是个lift,就是从一个Observable
生成另外一个Observable
咯,这个nest
是干嘛用?
public final Observable<Observable<T>> nest() {return just(this);
}
这里返回类型告诉我们,它是产生一个Observable<Observable<T>>
讲到这里,会有点晕,先记着这个,然后我们看OperatorSubscribeOn
这个操作符,
构造函数是
public OperatorSubscribeOn(Scheduler scheduler) {this.scheduler = scheduler;
}
OK,这里保存了scheduler
对象,然后就是我们前一章说过的转换方法。
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {final Worker inner = scheduler.createWorker();subscriber.add(inner);return new Subscriber<Observable<T>>(subscriber) {@Overridepublic void onCompleted() {// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(final Observable<T> o) {inner.schedule(new Action0() {@Overridepublic void call() {final Thread t = Thread.currentThread();o.unsafeSubscribe(new Subscriber<T>(subscriber) {@Overridepublic void onCompleted() {subscriber.onCompleted();}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(T t) {subscriber.onNext(t);}@Overridepublic void setProducer(final Producer producer) {subscriber.setProducer(new Producer() {@Overridepublic void request(final long n) {if (Thread.currentThread() == t) {// don't schedule if we're already on the thread (primarily for first setProducer call)// see unit test 'testSetProducerSynchronousRequest' for more context on thisproducer.request(n);} else {inner.schedule(new Action0() {@Overridepublic void call() {producer.request(n);}});}}});}});}});}};
}
让人纠结的类模板
看完这段又臭又长的,先深呼吸一口气,我们慢慢分析下。
首先要注意RxJava
里面最让人头疼的模板问题,那么OperatorMap
这个类的声明是
public final class OperatorMap<T, R> implements Operator<R, T>
而Operator
这个接口继承Func1
public interface Func1<T, R> extends Function {R call(T t);
}
我们这里不要记T
和R
,记住传入左边的模板是形参,传入右边的模板是返回值
。
好了,那么这里的
call
就是从一个T
转换成一个Observable<T>
的过程了。
总结一下,我们这一次调用subscribeOn
,做了两件事
1、
nest()
为Observable<T>
生成了一个Observable<Observable<T>>
2、lift()
对Observalbe<Observalbe<T>>
进行一个变化,变回Observable<T>
因为lift
是一个模板函数,它的返回值的类型是参照它的形参来,而他的形参是Operator<T, Observable<T>>
这个结论非常重要!!
OK,到这里我们已经存储了所有的序列,等着我们调用了。
调用链
首先,记录我们在调用这条指令之前的Observable<T>
,记为Observable$1
然后,经过lift
生成的Observable<T>
记为Observable$2
好了,现在我们拿到的依然是Observable<T>
这个对象,但是它不是原始的Observable$1
,要深深记住这一点,它是由lift
生成的Observable$2
,这时候进行subscribe
,那看到首先调用的就是OnSubscribe.call
方法,好,直接进入lift
当中生成的那个地方。
我们知道这一层lift
的operator
就是刚刚的OperatorSubscribOn
,那么调用它的call
方法,生成的是一个Subscriber<Observable<T>>
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {// new Subscriber created and being subscribed with so 'onStart' itst.onStart();onSubscribe.call(st);
} catch (Throwable e) {
...
}
好,还记得我们调用过nest
么?,这里的onSubscribe
可是nest
上下文中的噢,每一次,到这个地方,这个onSubscribe
就是上一层Observable
的onSubscribe
,即Observable<Observable<T>>
的onSubscribe
,相当于栈弹出了一层。它的call
直接在Subscriber
的onNext
中给出了最开始的Observable<T>
,我们这里就要看下刚刚在OperatorSubscribeOn
中生成的Subscriber
new Subscriber<Observable<T>>(subscriber) {@Overridepublic void onCompleted() {// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(final Observable<T> o) {inner.schedule(new Action0() {@Overridepublic void call() {final Thread t = Thread.currentThread();o.unsafeSubscribe(new Subscriber<T>(subscriber) {@Overridepublic void onCompleted() {subscriber.onCompleted();}@Overridepublic void onError(Throwable e) {subscriber.onError(e);}@Overridepublic void onNext(T t) {subscriber.onNext(t);}});}});}
}
对,就是它,这里要注意,这里的subscriber
就是我们在lift
中,传入的o
Subscriber<? super T> st = hook.onLift(operator).call(o);
对,就是它,其实它就是SafeSubscriber
。
回过头,看看刚刚的onNext()
方法,inner.schedule()
这个函数,我们可以认为就是postRun()
类似的方法,而onNext()
中传入的o
是我们之前生成的Observable$1
,是从Observable.just
封装出来的Observable<Observable<T>>
中产生的,这里调用了Observable$1.unsafeSubscribe
方法,我们暂时不关心它和subscribe
有什么不同,但是我们知道最终功能是一样的就好了。
注意它运行时的线程!!在
inner
这个Worker
上!于是它的运行线程已经被改了!!
好,这里的unsafeSubscribe
调用的方法就是调用原先Observable$1.onSubscribe
中的call
方法:
这个Observable$1
就是我们之前自己定义的Observable
了。
综上所述,如果我们需要我们的Observable$1
在一个别的线程上运行的时候,只需要在后面跟一个subscribeOn
即可。结合扔物线大大的图如下:
总结
这里逻辑着实不好理解。如果还没有理解的朋友,可以按照我前文说的顺序,细致的看下来,我把逻辑过一遍之后,发现lift
的陷阱实在太大,内部类用的风生水起,一不小心,就不知道一个变量的上下文是什么,需要特别小心。
之前我们分析过subscribeOn
这个函数,
现在我们来看下subscribeOn
和observeOn
这两个函数到底有什么异同。
用过rxjava
的旁友都知道,subscribeOn
和observeOn
都是用来切换线程用的,可是我什么时候用subscribeOn
,什么时候用observeOn
呢,我们很少知道这两个区别是啥。
友情提示,如果不想看分析过程的,可以直接跳到下面的总结部分。
subscribeOn
先看下OperatorSubscribeOn
的核心代码:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {final Scheduler scheduler;final Observable<T> source;public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {this.scheduler = scheduler;this.source = source;}@Overridepublic void call(final Subscriber<? super T> subscriber) {final Worker inner = scheduler.createWorker();subscriber.add(inner);inner.schedule(new Action0() {@Overridepublic void call() {Subscriber<T> s = new Subscriber<T>(subscriber) {@Overridepublic void onNext(T t) {subscriber.onNext(t);}@Overridepublic void onError(Throwable e) {try {subscriber.onError(e);} finally {inner.unsubscribe();}}@Overridepublic void onCompleted() {try {subscriber.onCompleted();} finally {inner.unsubscribe();}}....};source.unsafeSubscribe(s);}});}
}
这里注意两点:
因为
OperatorSubscribeOn
是个OnSubscribe
对象,所以在call
参数中传入的subscriber
就是我们在外面使用Observable.subscribe(a)
传入的对象a
。这里
source
对象指向的是调用subscribeOn
之前的那个Observable
序列。
明确了这两点,我们就很好的知道了subscribeOn
是如何工作,产生神奇的效果了。
其实最最主要的就是一行函数
source.unsafeSubscribe(s);
并且要注意它所在的位置,是在worker的call
里面,说白了,就是把source.subscribe
这一行调用放在指定的线程里,那么总结起来的结论就是:
subscribeOn
的调用,改变了调用前序列所运行的线程。
observeOn
同样看下OperatorObserveOn
这个类的主要代码:
public final class OperatorObserveOn<T> implements Operator<T, T> {private final Scheduler scheduler;private final boolean delayError;/*** @param scheduler the scheduler to use* @param delayError delay errors until all normal events are emitted in the other thread?*/public OperatorObserveOn(Scheduler scheduler, boolean delayError) {this.scheduler = scheduler;this.delayError = delayError;}@Overridepublic Subscriber<? super T> call(Subscriber<? super T> child) {....ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);parent.init();return parent;}/** Observe through individual queue per observer. */private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {final Subscriber<? super T> child;final Scheduler.Worker recursiveScheduler;final NotificationLite<T> on;final boolean delayError;final Queue<Object> queue;// the status of the current streamvolatile boolean finished;final AtomicLong requested = new AtomicLong();final AtomicLong counter = new AtomicLong();/** * The single exception if not null, should be written before setting finished (release) and read after* reading finished (acquire).*/Throwable error;// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should// not prevent anything downstream from consuming, which will happen if the Subscription is chainedpublic ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {this.child = child;this.recursiveScheduler = scheduler.createWorker();this.delayError = delayError;this.on = NotificationLite.instance();if (UnsafeAccess.isUnsafeAvailable()) {queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);} else {queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);}}void init() {// don't want this code in the constructor because `this` can escape through the // setProducer callSubscriber<? super T> localChild = child;localChild.setProducer(new Producer() {@Overridepublic void request(long n) {if (n > 0L) {BackpressureUtils.getAndAddRequest(requested, n);schedule();}}});localChild.add(recursiveScheduler);localChild.add(this);}@Overridepublic void onStart() {// signal that this is an async operator capable of receiving this manyrequest(RxRingBuffer.SIZE);}@Overridepublic void onNext(final T t) {if (isUnsubscribed() || finished) {return;}if (!queue.offer(on.next(t))) {onError(new MissingBackpressureException());return;}schedule();}@Overridepublic void onCompleted() {if (isUnsubscribed() || finished) {return;}finished = true;schedule();}@Overridepublic void onError(final Throwable e) {if (isUnsubscribed() || finished) {RxJavaPlugins.getInstance().getErrorHandler().handleError(e);return;}error = e;finished = true;schedule();}protected void schedule() {if (counter.getAndIncrement() == 0) {recursiveScheduler.schedule(this);}}// only execute this from schedule()@Overridepublic void call() {long emitted = 0L;long missed = 1L;// these are accessed in a tight loop around atomics so// loading them into local variables avoids the mandatory re-reading// of the constant fieldsfinal Queue<Object> q = this.queue;final Subscriber<? super T> localChild = this.child;final NotificationLite<T> localOn = this.on;// requested and counter are not included to avoid JIT issues with register spilling// and their access is is amortized because they are part of the outer loop which runs// less frequently (usually after each RxRingBuffer.SIZE elements)for (;;) {long requestAmount = requested.get();long currentEmission = 0L;while (requestAmount != currentEmission) {boolean done = finished;Object v = q.poll();boolean empty = v == null;if (checkTerminated(done, empty, localChild, q)) {return;}if (empty) {break;}localChild.onNext(localOn.getValue(v));currentEmission++;emitted++;}if (requestAmount == currentEmission) {if (checkTerminated(finished, q.isEmpty(), localChild, q)) {return;}}if (currentEmission != 0L) {BackpressureUtils.produced(requested, currentEmission);}missed = counter.addAndGet(-missed);if (missed == 0L) {break;}}if (emitted != 0L) {request(emitted);}}boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {if (a.isUnsubscribed()) {q.clear();return true;}if (done) {if (delayError) {if (isEmpty) {Throwable e = error;try {if (e != null) {a.onError(e);} else {a.onCompleted();}} finally {recursiveScheduler.unsubscribe();}}} else {Throwable e = error;if (e != null) {q.clear();try {a.onError(e);} finally {recursiveScheduler.unsubscribe();}return true;} elseif (isEmpty) {try {a.onCompleted();} finally {recursiveScheduler.unsubscribe();}return true;}}}return false;}}
}
这里的代码有点长,我们先注意到它是一个Operator
,它没有对上层Observable
做任何的控制或者包装。
既然是Operator
,那么它的职责就是把一个Subscriber
转换成另外一个Subscriber
, 我们来关注下转换后的Subscriber
对转换前的Subscriber
做了些什么事。
首先它是一个ObserveOnSubscriber
类, 既然是Subscriber
那么肯定有onNext
, onComplete
和onError
看最主要的onNext
@Override
public void onNext(final T t) {if (isUnsubscribed() || finished) {return;}if (!queue.offer(on.next(t))) {onError(new MissingBackpressureException());return;}schedule();
}
好了,这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule
启动传入的worker
我们这里需要注意下:
在调用
observeOn
前的序列,把结果传入到onNext
就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber
继续。
protected void schedule() {if (counter.getAndIncrement() == 0) {recursiveScheduler.schedule(this);}
}
recursiveScheduler
就是之前我们传入的Scheduler,我们一般会在observeOn
传入AndroidScheluders.mainThread()
对吧、
接下去,我们看下在scheduler
中调用的call
方法,这里只列出主要带代码
@Override
public void call() {...final Subscriber<? super T> localChild = this.child;for (;;) {...boolean done = finished;Object v = q.poll();boolean empty = v == null;if (checkTerminated(done, empty, localChild, q)) {return;}if (empty) {break;}localChild.onNext(localOn.getValue(v));...}if (emitted != 0L) {request(emitted);}
}
OK,在Scheduler
启动后, 我们在Observable.subscribe(a)
传入的a
就是这里的child
, 我们看到,在call
中终于调用了它的onNext
方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn
的线程上的。
那么总结起来的结论就是:
observeOn
对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
observeOn
对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber
复杂情况
我们经常多次使用subscribeOn
切换线程,那么以后是否可以组合observeOn
和subscribeOn
达到自由切换的目的呢?
组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn
调用之后,再调用subscribeOn
是无效的,原因是什么?
因为subscribeOn
改变的是subscribe
这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。
经过上面的阐述,我们知道,observeOn
的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn
调用了,也只是改变observeOn
这个消费者所在的线程,和OperatorObserveOn
中存储的原始消费者一点关系都没有,它还是由observeOn
控制。
总结
如果我们有一段这样的序列
Observable
.map // 操作1
.flatMap // 操作2
.subscribeOn(io)
.map //操作3
.flatMap //操作4
.observeOn(main)
.map //操作5
.flatMap //操作6
.subscribeOn(io) //!!特别注意
.subscribe(handleData)
假设这里我们是在主线程上调用这段代码,
那么
操作1
,操作2
是在io线程上,因为之后subscribeOn
切换了线程
操作3
,操作4
也是在io线程上,因为在subscribeOn
切换了线程之后,并没有发生改变。
操作5
,操作6
是在main线程上,因为在他们之前的observeOn
切换了线程。特别注意那一段,对于
操作5
和操作6
是无效的
再简单点总结就是
subscribeOn
的调用切换之前的线程。observeOn
的调用切换之后的线程。observeOn
之后,不可再调用subscribeOn
切换线程
=========
续 特别感谢@扔物线给的额外的总结
下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件
只有第一subscribeOn() 起作用(所以多个 subscribeOn() 毛意义)
这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn()
observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作
不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程
迷之RxJava —— 线程切换相关推荐
- RxJava 线程切换
前言 在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程.如果觉得源码枯燥可以直接移至文末看图理解. 实例代码 Observable.create(new Observ ...
- android rxjava 多线程,你真的了解RxJava的线程切换吗?
使用RxJava可以轻松地实现线程切换,所以在Android中常被用来替代AsyncTask.Handler等原生工具类.使用起来虽然简单,但如果不了解其背后的基本原理,很可能因为使用不当而写出bug ...
- 线程切换哪家强?RxJava与Flow的操作符对比
作者:fundroid 链接:https://juejin.cn/post/6943037393893064734#heading-13 Flow作为Coroutine版的RxJava,同RxJava ...
- java子线程切换到主线程_Android子线程切换到UI线程方法总结
线程切换 通过消息发送(发布)和接收(订阅)的方式切换的. 1 .Handler 子线程(非UI线程)调用handler对象sendMessage(msg)方法,将消息发送给关联Looper,Loop ...
- Rxjava2原理流程+操作符+线程切换 浅析~
0.前言 没拜读过强大的代码就建议去稍微看一下rxjava2的原理,并不难懂.写的非常的好,也能领略到大佬写的代码有多么的强.里面的设计模式真的牛逼 1.Rxjava2 Rxjava2用于我们来做响应 ...
- 理解RxJava线程模型
RxJava作为目前一款超火的框架,它便捷的线程切换一直被人们津津乐道,本文从源码的角度,来对RxJava的线程模型做一次深入理解.(注:本文的多处代码都并非原本的RxJava的源码,而是用来说明逻辑 ...
- 理解 RxJava 线程模型
http://blog.saymagic.cn/2016/08/20/understand-rxjava-threading-model.html 主题 线程 RxJava RxJava作为目前一款超 ...
- 应用退出前不让线程切换_用户级线程和内核级线程,你分清楚了吗?
前天晚上有个伙伴私信我说在学进程和线程,问我有没有好的方法和学习教程,刚好我最近也在备相关的课. 班上不少学生学的还是很不错的.拿班上小白和小明的例子吧(艺名哈).小明接受能力很强,小白则稍差些. 关 ...
- 模拟线程切换 C++
为什么80%的码农都做不了架构师?>>> 前言: 本文主要是剖析NachOs的线程切换原理,并通过一个简化的例子(就是将线程部分代码抽取出来再加以修改) 来说明.本文 gith ...
最新文章
- 英特尔携手中科院计算所建立中国首个 oneAPI 卓越中心
- 2020年,这些学者归国任教
- 非交互模式修改Ubuntu密码的命令
- 学python要多少钱-python学习费用多少合适
- rssi室内定位算法原理_三分钟看懂蓝牙室内定位 值得分享
- 实例32:python
- Git下使用Beyond Compare作为比较和合并工具
- 进入大厂的面试经验详细总结(P7 拿 offer)
- Install Kernel 3.10 on CentOS 6.5
- java 利用同步工具类控制线程
- java,python,scala发送http请求
- linux vim创建文件配置文件,vim linux 强大的配置文件
- 必须收藏!130 个相见恨晚的超实用网站,一次性分享出来
- 分享个网站首页弹窗代码
- ChucK学习笔记(零)——前言
- Netty实战二-实现UDP的单播和广播
- 软件工程作业一:从产品经理人角度分析微信求职招聘小程序
- RCWL-0516微波雷达模块检测人体移动(发光二极管)
- Python之文件处理-JSON文件
- matlab中buttord用法_matlab butter函数
热门文章
- android paint 字体,Android绘图之Paint的使用方法详解
- MultipartFile与File的互转
- 1.3.	一只大象口渴了,要喝20升水才能解渴,但现在只有一个深h厘米,底面半径为r厘米的小圆桶(h和r都是整数)。问大象至少要喝多少桶水才会解渴。编写程序输入半径和高度,输出需要的桶数(一定是整数)
- AssetManager
- 前端开发常用到的flex布局简单范例
- 华为S5720s配置dns解析使用ftp备份
- 常见的Linux操作系统
- OpenGL ES之glOrtho函数
- Python3 PNG文件格式及根据CRC检验码修复图片高度
- springBoot下java代码mysql数据库定时任务(创建表)