拆轮子系列--RxJava理解(三)--observeOn
本系列文章如下:
- 拆轮子系列--RxJava前奏篇
- 拆轮子系列--RxJava理解(一)--Map解析
- 拆轮子系列--RxJava理解(二)--subscribeOn
- 拆轮子系列--RxJava理解(三)--observeOn
上一篇文章主要介绍了RxJava
中线程调度的核心方法之一subscribeOn
,本篇文章继续分析RxJava
中线程调度的另一个核心方法--observeOn
。本篇文章基于RxJava2
源码进行分析。 本文的大纲如下:
- 一个具体的例子
- observeOn源码分析
- 总结
1 .一个具体的例子
首先,以一个具体的例子分析observeOn
的原理:
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("1");Thread.sleep(1000);e.onNext("2");Thread.sleep(1000);e.onComplete();}}).map(new Function<String, Integer>() {@Overridepublic Integer apply(String s) throws Exception {Log.e("TAG", "map1--thread=" + Thread.currentThread().getName() + "-s:" + s);return Integer.valueOf(s);}}).subscribeOn(AndroidSchedulers.mainThread()).map(new Function<Integer, Long>() {@Overridepublic Long apply(Integer integer) throws Exception {Log.e("TAG", "map2--thread=" + Thread.currentThread().getName() + "-integer:" + integer);return Long.valueOf(integer);}}).observeOn(Schedulers.io()).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {Log.e("TAG", "map3--thread=" + Thread.currentThread().getName() + "-aLong:" + aLong);return String.valueOf(aLong);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e("TAG", "Consumer--thread=" + Thread.currentThread().getName() + "-String:" + s);}});
复制代码
如果你了解map
这个操作符,那么这个例子你很快就能得运行结果,如果你对于map
这个操作符不太清楚,建议回顾下之前的文章拆轮子系列--RxJava理解(一)--Map解析。接下来我们看看本例的程序运行结果:
E/TAG: map1--thread-main-s:1
E/TAG: map2--thread-main-integer:1
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:1
E/TAG: map1--thread-main-s:2
E/TAG: map2--thread-main-integer:2
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:2
复制代码
细看下之前的例子,可能有些朋友已经发现了一个异常操作Thread.sleep(1000);
。为什么在发射元素的时候睡了一秒钟?这个是为什么呢?哈哈,先不急,下文将一一道来。 从上面运行的结果我们发现,除了observeOn()
下面的部分运行在observeOn()
指定的线程中,其余的部分运行在subscribeOn()
指定的线程,这个是为什么呢?下面再分析,这里先给个结论:RxJava中,observeOn()是用来指定下游observer回调发生的线程。对应上面的例子,也就是map3与Consumer运行的线程。
2. observeOn源码分析
为什么会产生上面的结果?我们来看看源码:
@CheckReturnValue@SchedulerSupport(SchedulerSupport.CUSTOM)public final Observable<T> observeOn(Scheduler scheduler) {return observeOn(scheduler, false, bufferSize());}@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {...return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));}
复制代码
从源码中我们可以看出,调用observeOn()
方法返回了一个Observable
对象,而真正的操作是在ObservableObserveOn()
这个方法里面,接下来我们看看ObservableObserveOn()
这个方法到底干了什么事情:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {super(source);this.scheduler = scheduler;this.delayError = delayError;this.bufferSize = bufferSize;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker();source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}}
复制代码
我们主要看看ObservableObserveOn
中主要的实现方法subscribeActual()
。在这个方法中,首先创建了一个指定的事物worker
,然后将worker
作为参数创建了一个ObserveOnObserver
对象,接下来我们分析这个ObserveOnObserver
中具体的逻辑:
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {this.actual = actual;this.worker = worker;this.delayError = delayError;this.bufferSize = bufferSize;}@Overridepublic void onSubscribe(Disposable s) {if (DisposableHelper.validate(this.s, s)) {this.s = s;if (s instanceof QueueDisposable) {@SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) s;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);if (m == QueueDisposable.SYNC) {sourceMode = m;queue = qd;done = true;actual.onSubscribe(this);schedule();return;}if (m == QueueDisposable.ASYNC) {sourceMode = m;queue = qd;actual.onSubscribe(this);return;}}queue = new SpscLinkedArrayQueue<T>(bufferSize);actual.onSubscribe(this);}}@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != QueueDisposable.ASYNC) {queue.offer(t);}schedule();}...
复制代码
ObserveOnObserver
实现了Observer
这个接口,重写了Observer
里面的方法,我们看看主要的方法onNext()
。在该方法中,首先会向queue()
中添加元素,我们主要关注schedule()
这个方法,进入schedule()
:
void schedule() {if (getAndIncrement() == 0) {worker.schedule(this);}}
复制代码
上述方法将实现了Runnable
接口的ObserveOnObserver
对象放入了worker
里面进行操作,直白的说,就是该ObserveOnObserver
对象的操作会被放入一个线程池中,寻找合适的线程运行。 主要的问题来了,当ObserveOnObserver
对象寻找到一条线程后执行了什么操作呢?继续看源码:
@Overridepublic void run() {if (outputFused) {drainFused();} else {drainNormal();}}//我们主要看看drainNormal()这个方法:void drainNormal() {int missed = 1;final SimpleQueue<T> q = queue;final Observer<? super T> a = actual;for (;;) {if (checkTerminated(done, q.isEmpty(), a)) {return;}for (;;) {boolean d = done;T v;try {v = q.poll();} catch (Throwable ex) {Exceptions.throwIfFatal(ex);s.dispose();q.clear();a.onError(ex);return;}boolean empty = v == null;if (checkTerminated(d, empty, a)) {return;}if (empty) {break;}a.onNext(v);}missed = addAndGet(-missed);if (missed == 0) {break;}}}
复制代码
其实这个方法就是一个死循环,它不断的从queue
取出元素然后交给由下一级传递上来的observer
来执行onNext()
方法。而这整个从queue
中取元素到由下级的observer
执行onNext()
方法,都是执行在scheduler( Scheduler.Worker w = scheduler.createWorker();)
所指定的线程中。总的来说,ObserveOnObserver
会将下一级传递过来的observer
进行封装,让它独立的运行在scheduler
指定的线程中去处理元素。 再回到前面的例子,我们在observeOn()
操作符后面接着使用了一个map()
操作符,那么此时的流程又是怎么样的呢?我们以一张图来进行说明:
从上图中可以看到,observeOn
后面跟了一个map()
,那么在drainNormal ()
方法中a.onNext(v)
的a
就是经过map
转换过的observer
,接着调用map
中o.onNext(transformer.call(t))
,此时保证了transformer.call()
方法运行在observeOn()
所指定的线程中,而o
就是observer2
。
3. 总结
使用observeOn()
这个操作符,会在原来Observer
发射元素的时候,将元素一个个的添加到一个指定的队列中,然后异步(使用一个新的线程)的从该队列中取出元素,将取出的元素交给下一级的observer
的onNext()
方法来处理元素。
回到前面抛出的一个问题,我们在发射元素的时候sleep了1秒钟
,这个是为什么呢?说明一下:因为我们取元素的过程是异步操作的,那么很有可能出现某个线程的转换执行完毕之后才执行另一个线程的转换操作,最后与我们期望的结果不太一样。
当我们去掉例子中sleep()
操作,其结果如下:
E/TAG: map1--thread=main-s:1
E/TAG: map2--thread=main-integer:1
E/TAG: map1--thread=main-s:2
E/TAG: map2--thread=main-integer:2
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:1
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:2
复制代码
好了,关于RxJava
中线程调度的核心方法observeOn
操作符已经介绍完毕。
如果文章中有什么疏漏或者错误的地方,还望各位指正,你们的监督是我最大的动力,谢谢!
拆轮子系列--RxJava理解(三)--observeOn相关推荐
- 拆轮子系列--RxJava理解(一)--Map解析
本系列文章如下: 拆轮子系列--RxJava前奏篇 拆轮子系列--RxJava理解(一)--Map解析 拆轮子系列--RxJava理解(二)--subscribeOn 拆轮子系列--RxJava理解( ...
- 拆轮子系列之教你一步步写验证码控件
拆轮子系列之教你一步步写验证码控件 前言 先看看效果 怎么样不错吧?别急下面我就一步一步的教你实现. 用到的知识点总结: 1.Canvas和pint的使用,我们用它画点,线,字 2.View的基本用法 ...
- Mininet系列实验(三):Mininet命令延伸实验扩展
Mininet系列实验(三):Mininet命令延伸实验扩展 1 实验目的 熟悉Mininet自定义拓扑三种实现方式:命令行创建.Python脚本编写.交互式界面创建. 2 实验原理 Mininet ...
- SLAM导航机器人零基础实战系列:(三)感知与大脑——5.机器人大脑嵌入式主板性能对比...
SLAM导航机器人零基础实战系列:(三)感知与大脑--5.机器人大脑嵌入式主板性能对比 摘要 在我的想象中机器人首先应该能自由的走来走去,然后应该能流利的与主人对话.朝着这个理想,我准备设计一个能自由 ...
- 一点就分享系列(理解篇_4+实践篇_2)”干货-全网最简且全”的理解!2020年了!您只知道GAN?ECCV超分论文“IRN” 全家桶大放送!!
一点就分享系列(理解篇_4+实践篇_2)"最新干货"--2020 ECCV 超分论文之一"IRN"(更新中..) 最近开始了csdn坚持原创之旅,目前到了理解篇 ...
- 一点就分享系列(理解篇3)—Cv任务“新世代”之Transformer系列 (中篇-视觉模型篇DETR初代版本)
一点就分享系列(理解篇3)-Cv任务"新世代"之Transformer系列 (中篇-视觉模型篇) 对于上篇介绍transformer得原理,自认为把细节讲得很详细了,作为" ...
- 一点就分享系列(理解篇3)—Cv任务“新世代”之Transformer(下篇)提前“cv领域展开”——快速学习“视觉transformer的理解”+“一些吐槽”
一点就分享系列(理解篇3)Cv任务"新世代"之Transformer(下篇)--"cv领域展开" 提示:本篇内容为下篇,如感兴趣可翻阅上和中篇! 理解篇3 上 ...
- 重复造轮子系列——基于FastReport设计打印模板实现桌面端WPF套打和商超POS高度自适应小票打印...
重复造轮子系列--基于FastReport设计打印模板实现桌面端WPF套打和商超POS高度自适应小票打印 一.引言 桌面端系统经常需要对接各种硬件设备,比如扫描器.读卡器.打印机等. 这里介绍下桌面端 ...
- 大恶人吉日嘎拉之走火入魔闭门造车之.NET疯狂架构经验分享系列之(三)商业逻辑代码部分...
其实,写好几套管理软件后发现,其实大多管理软件,很多也不过是数据库设计得合理一些后 就是把数据搬来搬去而已,添加.删除.修改,然后进行一些统计分析而已.其实写代码都是 那些简单的程序Copy来Copy ...
最新文章
- python flask跨域_Ajax与Flask传值的跨域问题
- 填充磁盘空间的工具和方法
- 【JavaScript】【PPT】继承的本质
- Python TeamViewer批量提交密码重置--分析与实现
- Linux内存管理之mmap详解
- String类、StringBuffer类、StringBuilder类的区别
- “五月天才不短咧” TME live这样焕发线上Live演出的生命力
- [地图SkyLine二次开发]框架(2)
- 电脑用电量_诡异!北山一空置房子用电量噌噌上涨,工作人员打开门一看……_媒体_澎湃新闻...
- 请绘制计算机串行通信原理图,单片机实验报告格式6
- NGINX配置gzip请求自动解压
- 远程控制软件TeamViewer
- 华为机试OD真题 javaScript和java 叠积木 堆积木
- 汽车试验数据管理(TDM系统)的特点分析及解决方案
- js实现页面定时跳转
- JS计算字符串在浏览器中显示的宽度
- 一起用Python做个自动化短视频生成脚本,实现热门视频流水线生产!
- [Vue源码解析] patching算法
- NXP S32K1 FlexTimer模块
- 解决非系统盘出现Program Files文件夹以及Program Files下的ModifiableWindowsApps文件夹无法删除的问题。