RxAndroid dispose实现原理
示例如下:
final Disposable disposable = Observable.create(new ObservableOnSubscribe<String>() {public void subscribe(@NonNull ObservableEmitter<String> e) {for (int i=0; i<100; i++) {if (e.isDisposed())break;Thread.sleep(1000);e.onNext(String.valueOf(i));}e.onComplete();}}).map(new Function<Integer, String>() {public String apply(Integer number) {return number.toString();}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();new Handler().postDelayed(new Runnable() {public void run() {disposable.dispose();}}, 3000);
问题:
为什么调用disposable.dispose之后,ObservableEmitter.isDisposed为true?
分析:
经过调试发现,disposable.dispose这一句执行后最终会调用ObservableCreate.CreateEmitter.dispose。
【connect】在observer.onSubscribe时连接成一条线
- Disposable是什么?
- 怎样连接起来?
- 怎样跨越线程?
- 怎样跨越操作符?
1,Disposable是一个interface
/*** Represents a disposable resource.*/
public interface Disposable {/*** Dispose the resource, the operation should be idempotent.*/void dispose();/*** Returns true if this resource has been disposed.* @return true if this resource has been disposed*/boolean isDisposed();
}
2,当subscribe调用后,会构造一个LambdaObserver,最终返回的Dispose就是这个。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe) {ObjectHelper.requireNonNull(onNext, "onNext is null");ObjectHelper.requireNonNull(onError, "onError is null");ObjectHelper.requireNonNull(onComplete, "onComplete is null");ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);subscribe(ls);return ls;
}
假设只有Observable.create与subscribe,那么,将会进入ObservableCreate.subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {// observer即传入的LambdaObserverCreateEmitter<T> parent = new CreateEmitter<T>(observer);// 重点在此,看看里面做了什么observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}
LambdaObserver.onSubscribe
@Override
public void onSubscribe(Disposable d) {// 注意,继承了AtomicReference<Disposable>的类内部都有一个// volatile Disposable value,// 此处就是保存传入的Disposable(CreateEmitter),如此,// 当LambdaObserver的dispose调用时,会调用到CreateEmitter.disposeif (DisposableHelper.setOnce(this, d)) {try {onSubscribe.accept(this);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);d.dispose();onError(ex);}}
}
3,先看subscribeOn,ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);// downstream保存SubscribeOnObserver对象(Disposable)observer.onSubscribe(parent);// 任务抛到线程,并且将其Disposable对象保存parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
当source.subscribe调用后,SubscribeOnObserver内onSubscribe如下
@Override
public void onSubscribe(Disposable d) {// 将上游observable保存到upstream中DisposableHelper.setOnce(this.upstream, d);
}
当LambdaObserver.dispose调用后,SubscribeOnObserver内dispose被调用
@Override
public void dispose() {// 触发上游disposeDisposableHelper.dispose(upstream);// dispose任务,任务在线程中执行会先判断isDisposed再决定是否执行DisposableHelper.dispose(this);
}
再看observeOn,ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker();// ObserveOnObserver.onNext时,会在worker中执行observer.onNextsource.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}
}
ObservableObserveOn.onSubscribe
@Override
public void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {// 将CreateEmitter保存起来this.upstream = d;if (d instanceof QueueDisposable) {@SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) d;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);if (m == QueueDisposable.SYNC) {sourceMode = m;queue = qd;done = true;downstream.onSubscribe(this);schedule();return;}if (m == QueueDisposable.ASYNC) {sourceMode = m;queue = qd;downstream.onSubscribe(this);return;}}queue = new SpscLinkedArrayQueue<T>(bufferSize);// downstream保存此Disposable对象downstream.onSubscribe(this);}
}
LambdaObserver.dispose调用后,会执行ObservableObserveOn.dispose
@Override
public void dispose() {if (!disposed) {disposed = true;// 中止上游任务upstream.dispose();// 中止线程中的任务worker.dispose();if (getAndIncrement() == 0) {queue.clear();}}
}
4,以map为例
Observable.create().map().subscribe();
内部保存upstream,并把自己保存在downstream的Disposable对象中。
从map.subscribeActual开始
@Override
public void subscribeActual(Observer<? super U> t) {// source为ObservableCreate// t为LambdaObserversource.subscribe(new MapObserver<T, U>(t, function));
}
source.subscribe调用后会进入MapObserver.onSubscribe,在基类中BasicFuseableObserver
@Override
public final void onSubscribe(Disposable d) {// d为CreateEmitter,保存为upstreamif (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;if (d instanceof QueueDisposable) {this.qd = (QueueDisposable<T>)d;}if (beforeDownstream()) {// downstream为LambdaObserver,在构造函数中赋值的// 该句调用后,LambdaObserver保存此对象Disposabledownstream.onSubscribe(this);afterDownstream();}}
}
因此,LambdaObserver.dispose调用后,会调用MapObserver.dispose
@Override
public void dispose() {// upstream即上面的CreateEmitterupstream.dispose();
}
总结:
下游会保存上游的Disposable对象,从而,在下游开始调用dispose后,实际上是去调用上游的dispose,如此,直到第一个Observable对象调用dispose停止,从而达到每个Disposable对象都为DISPOSED。
RxAndroid dispose实现原理相关推荐
- RxJava的Disposable及其工作原理
一.关于 Disposable 任何订阅者模式的代码,都需要注意注册与注销的配对出现,否则会出现内存泄漏.RxJava2 提供了 Disposable( RxJava1 中是 Subscription ...
- 响应式编程简介之:Reactor
文章目录 简介 Reactor简介 reactive programming的发展史 Iterable-Iterator 和Publisher-Subscriber的区别 为什么要使用异步reacti ...
- 那些年收藏的技术文章(一) CSDN篇
#Android ##Android基础及相关机制 Android Context 上下文 你必须知道的一切 Android中子线程真的不能更新UI吗? Android基础和运行机制 Android任 ...
- Android复习系列④之《Android进阶》
Android进阶 1 Okhttp OkHttpClient相当于配置中心, 所有的请求都会共享这些配置(例如出错是否重试.共享的连接池) . 1.OkHttpCLient中的配置主要有: Disp ...
- 那些年收藏的技术文章(一)-CSDN篇
Android Android基础及相关机制 Android View体系 Android坐标相关 Android事件机制及相关问题 Android官方组件 Android Service Andro ...
- RxSwift之深入解析dispose源码的实现原理
一.前言 任何对象都有生命周期,有创建就要销毁.OC 中有 init 和 dealloc,swift 有 init 和 deinit,RxSwift 也不例外,有 create 和 dispose. ...
- 行动力决定了一个人的成败,有想法,就去做! C#的内存管理原理解析+标准Dispose模式的实现
尽管.NET运行库负责处理大部分内存管理工作,但C#程序员仍然必须理解内存管理的工作原理,了解如何高效地处理非托管的资源,才能在非常注重性能的系统中高效地处理内存. C#编程的一个优点就是程序员不必担 ...
- RxJava和RxAndroid
现在RxJava和RxAndroid越来越火爆,自己在业余时间也学习了一下,感觉确实很好用,之前 为了完成页面刷新,数据请求,组件信息传递的时候,要使用handler,真的是逻辑思路很强,稍微不注意, ...
- 重拾Android之路(五)RxJava和RxAndroid
现在RxJava和RxAndroid越来越火爆,自己在业余时间也学习了一下,感觉确实很好用,之前 为了完成页面刷新,数据请求,组件信息传递的时候,要使用handler,真的是逻辑思路很强,稍微不注意, ...
最新文章
- eplan导出部件汇总表_干货分享:西门子产品数据表导入到博途和EPLAN应用举例...
- 关于maven仓库中的_remote.repositories
- Boost:用成员函数测试bind <void>
- 空值为0非空为1_万达广场4周年,1降到底!0元送万张杂技团门票、人气餐饮6.8折,这波周年庆我先锁为敬...
- 莫比乌斯,欧拉函数题目练习(完结)
- 写给我的女神,一个用灵魂歌唱的小精灵
- 转] 两种自定义表单设计方案
- Joyoshare VidiKit教程:如何将字幕添加到WMV电影中?
- 大前端(全栈)学习路线指南
- PTAM特征点法跟踪和建图 SLAM FAST Patch
- 仓库盘点的四大方法和盘点流程
- php一般培训呢多久,php的培训一般课程是多久
- win7浏览器主页修改不过来_IE浏览器主页无法修改的两种解决办法
- 分子动力学基本概念(持续更新)
- JVM之运行时数据区(方法区)
- GIS经纬度坐标转换为unity3D的世界坐标
- 安装向日葵后,还是连不上问题
- 偶然发现的一篇文章 激励自己吧。
- android中文件加密和解密的实现
- 淘宝秒杀(Python源代码)