示例如下:

    final Disposable disposable = Observable.create(new ObservableOnSubscribe<String>() {public void subscribe(@NonNull ObservableEmitter<String> e) {for (int i=0; i<100; i++) {if (e.isDisposed())break;Thread.sleep(1000);e.onNext(String.valueOf(i));}e.onComplete();}}).map(new Function<Integer, String>() {public String apply(Integer number) {return number.toString();}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();new Handler().postDelayed(new Runnable() {public void run() {disposable.dispose();}}, 3000);

问题:

为什么调用disposable.dispose之后,ObservableEmitter.isDisposed为true?

分析:

经过调试发现,disposable.dispose这一句执行后最终会调用ObservableCreate.CreateEmitter.dispose。

【connect】在observer.onSubscribe时连接成一条线

  1. Disposable是什么?
  2. 怎样连接起来?
  3. 怎样跨越线程?
  4. 怎样跨越操作符?

1,Disposable是一个interface

/*** Represents a disposable resource.*/
public interface Disposable {/*** Dispose the resource, the operation should be idempotent.*/void dispose();/*** Returns true if this resource has been disposed.* @return true if this resource has been disposed*/boolean isDisposed();
}

2,当subscribe调用后,会构造一个LambdaObserver,最终返回的Dispose就是这个。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe) {ObjectHelper.requireNonNull(onNext, "onNext is null");ObjectHelper.requireNonNull(onError, "onError is null");ObjectHelper.requireNonNull(onComplete, "onComplete is null");ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);subscribe(ls);return ls;
}

假设只有Observable.create与subscribe,那么,将会进入ObservableCreate.subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {// observer即传入的LambdaObserverCreateEmitter<T> parent = new CreateEmitter<T>(observer);// 重点在此,看看里面做了什么observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}

LambdaObserver.onSubscribe

@Override
public void onSubscribe(Disposable d) {// 注意,继承了AtomicReference<Disposable>的类内部都有一个// volatile Disposable value,// 此处就是保存传入的Disposable(CreateEmitter),如此,// 当LambdaObserver的dispose调用时,会调用到CreateEmitter.disposeif (DisposableHelper.setOnce(this, d)) {try {onSubscribe.accept(this);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);d.dispose();onError(ex);}}
}

3,先看subscribeOn,ObservableSubscribeOn

@Override
public void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);// downstream保存SubscribeOnObserver对象(Disposable)observer.onSubscribe(parent);// 任务抛到线程,并且将其Disposable对象保存parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

当source.subscribe调用后,SubscribeOnObserver内onSubscribe如下

@Override
public void onSubscribe(Disposable d) {// 将上游observable保存到upstream中DisposableHelper.setOnce(this.upstream, d);
}

当LambdaObserver.dispose调用后,SubscribeOnObserver内dispose被调用

@Override
public void dispose() {// 触发上游disposeDisposableHelper.dispose(upstream);// dispose任务,任务在线程中执行会先判断isDisposed再决定是否执行DisposableHelper.dispose(this);
}

再看observeOn,ObservableObserveOn

@Override
protected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker();// ObserveOnObserver.onNext时,会在worker中执行observer.onNextsource.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}
}

ObservableObserveOn.onSubscribe

@Override
public void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {// 将CreateEmitter保存起来this.upstream = d;if (d instanceof QueueDisposable) {@SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) d;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);if (m == QueueDisposable.SYNC) {sourceMode = m;queue = qd;done = true;downstream.onSubscribe(this);schedule();return;}if (m == QueueDisposable.ASYNC) {sourceMode = m;queue = qd;downstream.onSubscribe(this);return;}}queue = new SpscLinkedArrayQueue<T>(bufferSize);// downstream保存此Disposable对象downstream.onSubscribe(this);}
}

LambdaObserver.dispose调用后,会执行ObservableObserveOn.dispose

@Override
public void dispose() {if (!disposed) {disposed = true;// 中止上游任务upstream.dispose();// 中止线程中的任务worker.dispose();if (getAndIncrement() == 0) {queue.clear();}}
}

4,以map为例

Observable.create().map().subscribe();

内部保存upstream,并把自己保存在downstream的Disposable对象中。

从map.subscribeActual开始

@Override
public void subscribeActual(Observer<? super U> t) {// source为ObservableCreate// t为LambdaObserversource.subscribe(new MapObserver<T, U>(t, function));
}

source.subscribe调用后会进入MapObserver.onSubscribe,在基类中BasicFuseableObserver

@Override
public final void onSubscribe(Disposable d) {// d为CreateEmitter,保存为upstreamif (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;if (d instanceof QueueDisposable) {this.qd = (QueueDisposable<T>)d;}if (beforeDownstream()) {// downstream为LambdaObserver,在构造函数中赋值的// 该句调用后,LambdaObserver保存此对象Disposabledownstream.onSubscribe(this);afterDownstream();}}
}

因此,LambdaObserver.dispose调用后,会调用MapObserver.dispose

@Override
public void dispose() {// upstream即上面的CreateEmitterupstream.dispose();
}

总结:

下游会保存上游的Disposable对象,从而,在下游开始调用dispose后,实际上是去调用上游的dispose,如此,直到第一个Observable对象调用dispose停止,从而达到每个Disposable对象都为DISPOSED。

RxAndroid dispose实现原理相关推荐

  1. RxJava的Disposable及其工作原理

    一.关于 Disposable 任何订阅者模式的代码,都需要注意注册与注销的配对出现,否则会出现内存泄漏.RxJava2 提供了 Disposable( RxJava1 中是 Subscription ...

  2. 响应式编程简介之:Reactor

    文章目录 简介 Reactor简介 reactive programming的发展史 Iterable-Iterator 和Publisher-Subscriber的区别 为什么要使用异步reacti ...

  3. 那些年收藏的技术文章(一) CSDN篇

    #Android ##Android基础及相关机制 Android Context 上下文 你必须知道的一切 Android中子线程真的不能更新UI吗? Android基础和运行机制 Android任 ...

  4. Android复习系列④之《Android进阶》

    Android进阶 1 Okhttp OkHttpClient相当于配置中心, 所有的请求都会共享这些配置(例如出错是否重试.共享的连接池) . 1.OkHttpCLient中的配置主要有: Disp ...

  5. 那些年收藏的技术文章(一)-CSDN篇

    Android Android基础及相关机制 Android View体系 Android坐标相关 Android事件机制及相关问题 Android官方组件 Android Service Andro ...

  6. RxSwift之深入解析dispose源码的实现原理

    一.前言 任何对象都有生命周期,有创建就要销毁.OC 中有 init 和 dealloc,swift 有 init 和 deinit,RxSwift 也不例外,有 create 和 dispose. ...

  7. 行动力决定了一个人的成败,有想法,就去做! C#的内存管理原理解析+标准Dispose模式的实现

    尽管.NET运行库负责处理大部分内存管理工作,但C#程序员仍然必须理解内存管理的工作原理,了解如何高效地处理非托管的资源,才能在非常注重性能的系统中高效地处理内存. C#编程的一个优点就是程序员不必担 ...

  8. RxJava和RxAndroid

    现在RxJava和RxAndroid越来越火爆,自己在业余时间也学习了一下,感觉确实很好用,之前 为了完成页面刷新,数据请求,组件信息传递的时候,要使用handler,真的是逻辑思路很强,稍微不注意, ...

  9. 重拾Android之路(五)RxJava和RxAndroid

    现在RxJava和RxAndroid越来越火爆,自己在业余时间也学习了一下,感觉确实很好用,之前 为了完成页面刷新,数据请求,组件信息传递的时候,要使用handler,真的是逻辑思路很强,稍微不注意, ...

最新文章

  1. eplan导出部件汇总表_干货分享:西门子产品数据表导入到博途和EPLAN应用举例...
  2. 关于maven仓库中的_remote.repositories
  3. Boost:用成员函数测试bind <void>
  4. 空值为0非空为1_万达广场4周年,1降到底!0元送万张杂技团门票、人气餐饮6.8折,这波周年庆我先锁为敬...
  5. 莫比乌斯,欧拉函数题目练习(完结)
  6. 写给我的女神,一个用灵魂歌唱的小精灵
  7. 转] 两种自定义表单设计方案
  8. Joyoshare VidiKit教程:如何将字幕添加到WMV电影中?
  9. 大前端(全栈)学习路线指南
  10. PTAM特征点法跟踪和建图 SLAM FAST Patch
  11. 仓库盘点的四大方法和盘点流程
  12. php一般培训呢多久,php的培训一般课程是多久
  13. win7浏览器主页修改不过来_IE浏览器主页无法修改的两种解决办法
  14. 分子动力学基本概念(持续更新)
  15. JVM之运行时数据区(方法区)
  16. GIS经纬度坐标转换为unity3D的世界坐标
  17. 安装向日葵后,还是连不上问题
  18. 偶然发现的一篇文章 激励自己吧。
  19. android中文件加密和解密的实现
  20. 淘宝秒杀(Python源代码)

热门文章

  1. Word如何将波浪号(~)如何打到中间
  2. 数据结构教程(第五版 李春葆 上机实验题4 验证性实验)
  3. Windows系统下安装VirtualBox及安装Ubuntu16.04
  4. 每天一个kali无线命令--airmon-ng
  5. 一枚芯片的实际成本是多少?(2)晶片的成本
  6. Appium学习之---MonkeyRunner环境搭建
  7. C#中实现多态的三种方式:抽象类,虚方法,接口
  8. 集团企业IT数据安全及权限管理制度
  9. 面试题:mysql员工表和组织表相关的一些题
  10. 腾讯社区开放平台.NET SDK在Mono下运行