在Rxjava中Observable 有 Hot 和 Cold之分.

Hot Observable:

Hot Observable无论有没有观察者进行订阅事件始终都会发生. 当Hot Observable 有多个订阅者时(即多个订阅者进行订阅时),

Hot Observable与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息.

Cold Observable:

Cold Observable是只有观察者进行订阅了,才开始执行发射数据流的代码.并且Cold Observable和Observer只能是一对一的关系,

当有多个不同的订阅者时,上游的Observable每有一个订阅者订阅一次就会重新执行一次发射数据流的代码. 也就是说,对于Cold Observable而言,有多个Observer的时候它们各自的事件是独立存在的

每个订阅者都会独立的接收到它们的数据流

1.Cold Observable 冷的Observable

Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {@Overridepublic void subscribe(final ObservableEmitter<Long> e) throws Exception {Log.d(TAG,"subscribe : " + "  Thread = " + Thread.currentThread().getName());Observable.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.computation()).take(10).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.d(TAG, "上游发射了 : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());e.onNext(aLong);if(aLong.longValue() >= 9){e.onComplete();}}});}}).observeOn(Schedulers.newThread());//通过Observable的create just  fromXxx range timer等这些创建的都是Cold Observable即冷的Observable对象Observer<Long> observer1 = new Observer<Long>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe1 开始订阅  " + Thread.currentThread().getName());}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "onNext1  : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError1 : " + e.getMessage() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG, "onComplete1 :  Thread = " + Thread.currentThread().getName());}};Observer observer2 = new Observer<Long>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG,"onSubscribe2 开始订阅  " + Thread.currentThread().getName());}@Overridepublic void onNext(Long aLong) {Log.d(TAG,"onNext2  : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG,"onError2 : " + e.getMessage() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG,"onComplete2 :  Thread = " + Thread.currentThread().getName());}};//下游的两个订阅者分别在不同的时间进行事件的订阅observable.subscribe(observer1);SystemClock.sleep(500);observable.subscribe(observer2);

打印结果:

从打印结果上可以看出这两个订阅者在不同的时间点上订阅了上游的Observable 但是他们接收的数据是一样的

各自相互独立 只是在同一时刻他们接收的数据是不同的

2.通过publish()操作符将我们的Cold Observable转化成Hot Observable

//通过publish()操作符将我们的Cold Observable转化成Hot ObservableConnectableObservable<Long> publish = Observable.create(new ObservableOnSubscribe<Long>() {@Overridepublic void subscribe(final ObservableEmitter<Long> e) throws Exception {Log.d(TAG,"subscribe : 开始发射数据" + "  Thread = " + Thread.currentThread().getName());Observable.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.computation()).take(5).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.d(TAG, "上游发射了 : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());e.onNext(aLong.longValue());if (aLong.longValue() >= 4) {e.onComplete();}}});}}).observeOn(Schedulers.newThread())//通过publish()操作符将我们的Cold Observable转化成为一个ConnectableObservable的Hot Observable.publish();//生成的ConnectableObservable需要调用connect()才能够真正执行上游发射数据流的代码Disposable disposable = publish.connect();Observer<Long> observer1 = new Observer<Long>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe1 开始订阅  " + Thread.currentThread().getName());}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "onNext1  : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError1 : " + e.getMessage() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG, "onComplete1 :  Thread = " + Thread.currentThread().getName());}};Observer observer2 = new Observer<Long>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG,"onSubscribe2 开始订阅  " + Thread.currentThread().getName());}@Overridepublic void onNext(Long aLong) {Log.d(TAG,"onNext2  : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG,"onError2 : " + e.getMessage() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG,"onComplete2 :  Thread = " + Thread.currentThread().getName());}};publish.subscribe(observer1);SystemClock.sleep(300);publish.subscribe(observer2);

打印结果:

从打印结果可以看出下游的两个订阅者在不同的时间订阅了上游的Observable但是他们最终的在同一时刻接收到

的事件是一样的即这下游的两个订阅者是共享同一个事件的而且从log的打印结果也可以印证Cold Observable是

每有一个订阅者就会执行一次subscribe方法重新执行一次发射数据流的方法

而Hot Observable是不管有多少个订阅者只会执行一次发射数据流的subscribe方法

而且执行的顺序也不同:Cold Observable 的执行顺序是: Observer(onSubscribe()) -> Obervable(subscribe()

每有一个订阅者订阅就执行一次该方法) -> Observer(onNext()) -> Observer(onComplete()/onError())

Hot Observable的执行顺序是: Observable(subscribe() 不管有多少个订阅者该方法永远只执行一次)

-> Observer(onSubscribe()) -> Observer(onNext()) -> Observer(onComplete()/onError())

对于Cold Observableb必须要有订阅者订阅才开始发射数据,而通过publish()操作符将Cold Observable

转化成为了Hot Observable之后必须调用connect()方法才会满足不管有没有订阅者订阅数据Hot Observable

都会不断地发射数据的条件

publish()操作符返回的是一个ConnecatableObservable对象这个类是Observable的子类多了一下的几个方法:

connect() :ConnectableObservable如果不调用connect方法是不会触发上游的数据流的执行,当我们调用了connect()方法ConnectableObservable就会开始接受上游Observable发射的数据流

其实在ConnectableObservable的内部有一个PublishObserver对象,这个对象有两个作用:

1.当我们调用connect()方法的时候,PublishObserver开始接受上游Observable发射的数据,这也就是为什么当我们调用了

connect()但是其实下游没有订阅者订阅ConnectableObservable时上游就开始执行发射数据流的代码了

2.PublishObserver会在内部存储所有下游的订阅了上游ConnectableObservable的订阅者Observer对象,每当PublishObserver

接受到一个上游发射过来的数据,PublishObserver就会将接收到的结果依次分发给它内部存储的所有的订阅者Observer对象,

如果下游的Observer调用了dispose()方法取消了订阅,那么PublishObserver对象就会从自己的缓存中删除这个订阅者Observer

对象,同时下次接收到上游发射的数据的时候也不会分发给这个订阅者Observer对象.

以上我们可以中断下游订阅者Observer和ConnectableObservable之间的订阅关系那么我们有没有办法可以中断ConnectableObservable和上游发射数据流的Observable(即调用publish()的那个Observable)之间的订阅关系呢?

当然可以,我们可以通过调用connect()方法返回的Disposable对象来中断ConnectableObservable和上游发射数据流Observable之间的数 据订阅关系

//理解publish() connect()  Connectable的关系//通过publish()生成ConnectableObservable对象ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {@Overridepublic void subscribe(final ObservableEmitter<Long> e) throws Exception {Log.d(TAG,"subscribe : 开始发射数据" + "  Thread = " + Thread.currentThread().getName());Observable.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.computation()).take(Integer.MAX_VALUE).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.d(TAG, "上游发射了 : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());e.onNext(aLong.longValue());}});}}).publish();//通过connect()方法让ConnectableObserbable开始订阅上游Obsevable发射的数据流同时返回Disposable对象//上游开始发射数据流 不管下游有没有订阅者订阅ConnectableObservable都会开始发射数据流Disposable disposable = connectableObservable.connect();Consumer<Long> consumer1 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer1: " + o.longValue());}};Consumer<Long> consumer2 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer2: " + o.longValue());}};SystemClock.sleep(200);Disposable disposable1 = connectableObservable.subscribe(consumer1);SystemClock.sleep(400);Disposable disposable2 = connectableObservable.subscribe(consumer2);SystemClock.sleep(500);//通过disposable1和disposable2中断下游订阅者和上游ConnectableObservable之间的订阅关系//但是ConnectableObservable还会继续接受上游Observable发射的数据流Log.d(TAG,"disposable1 取消了 订阅...........");disposable1.dispose();SystemClock.sleep(200);Log.d(TAG,"disposable2 取消了 订阅...........");disposable2.dispose();SystemClock.sleep(300);Log.d(TAG,"consumer1又重新开始订阅上游ConnectableObservable数据.......");connectableObservable.subscribe(consumer1);SystemClock.sleep(200);//中断ConnectableObservable和上游产生数据流的Observable之间的订阅关系//此时会停止ConnectableObservable把数据转发给下游的订阅者Log.d(TAG,"disposable  中断ConnectableObservable和上游产生数据流的Observable之间的订阅关系.......");disposable.dispose();SystemClock.sleep(300);//ConnectableObservable重新订阅上游的数据流//此时如果源Observable是一个Cold Observable的话会重新触发走一遍发射数据流的代码Log.d(TAG,"重新开始转发数据....");disposable = connectableObservable.connect();//consumer1和consumer2重新订阅ConnectableObservable此时consumer1 consumer2接收的是全新的从0开始的数据connectableObservable.subscribe(consumer1);connectableObservable.subscribe(consumer2);

打印结果:

connect(Consumer<? super Disposable> connection):和connect()方法一样 只是connect()是将Disposable对象作为了

  方法的返回值,而该方法是将Disposable对象传递给了Consumer对象进行处理,可以避免打破rxajva的调用链

refCount():通过我们对publish()和connect()操作符的理解我们知道publish()会返回一个ConnectableObservable对象通过

     connect()会使ConnectableObservable订阅上游的Observable并触发上游的Observable开始执行发射数据流的代码

我们可以通过connect()方法返回的Disposable对象来使ConnectableObservable和上游Observable之间的订阅关系取消,从而

也可以中断ConnectableObservable向下游的订阅者转发接收到的上游发过来的数据

但是我们如果每次都这样操作会很麻烦,因此refCount()就是帮我们自动调用connect()和dispose()方法来触发发射数据流和

终止订阅关系从而停止转发数据给下游的订阅者这么的一个操作

refCount()可以说是最常用的操作符了。他会把 ConnectableObservable变为一个通常的Observable但又保持了HotObservable的特性。也就是说,如果出现第一个Subscriber,他就会自动调用 connect()方法触发上游Observable执行发射数据流的代码,如果他开始接受之后,下游的 Subscribers全部dispose,那么他也会停止接受上游的数据, RxJava将他和publish合并为一个操作符 :share()

//通过RefCount将Hot Observable 转换成Cold ObservableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {@Overridepublic void subscribe(final ObservableEmitter<Long> e) throws Exception {Log.d(TAG,"subscribe : 开始发射数据" + "  Thread = " + Thread.currentThread().getName());Observable.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.computation()).take(Integer.MAX_VALUE).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.d(TAG, "上游发射了 : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());e.onNext(aLong.longValue());}});}}).observeOn(Schedulers.newThread());//通过publish()将Cold Obserbable 转换成Hot Observable 即ConnectableObservable
//        ConnectableObservable<Long> connectableObservable = observable.publish();//通过RefCount()将Hot Observable 转换成一个普通的Obervable 即Cold Observable
//        Observable<Long> normalObservable = connectableObservable.refCount();//对于这两部我们可以通过share()操作符一步搞定Observable<Long> normalObservable = observable.share();Consumer<Long> consumer1 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer1: " + o.longValue());}};Consumer<Long> consumer2 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer2: " + o.longValue());}};SystemClock.sleep(100);Disposable disposable1 = normalObservable.subscribe(consumer1);SystemClock.sleep(300);Disposable disposable2 = normalObservable.subscribe(consumer2);SystemClock.sleep(200);disposable1.dispose();SystemClock.sleep(150);disposable2.dispose();SystemClock.sleep(100);normalObservable.subscribe(consumer1);SystemClock.sleep(150);normalObservable.subscribe(consumer2);

打印结果:

autoConnect()

autoConnect(int numberOfSubscribers)

autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)

以上这三个autoConnect()方法功能大致相同:

autoConnect()看名字就知道,他会自动链接,如果你单纯调用 autoConnect() ,那么,他会在你链接第一个 订阅者Subscriber 的时候调用 connect(),或者你调用 autoConnect(int Num),那么他将会再收到Num个 subscriber的时候连接。

但是,这个操作符的关键在于,由于我们为了链式调用,autoConnect会返回Observable对象给你,你不会在返回方法里获得一个 Disposable来控制上游的开关。 不过没问题,autoConnect提供了另一种重载方法 :
autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)
他会在这个 Consumer传给你 你需要的那个总开关。而且,autoConnect并不会autoDisconnect,

也就是如果他即使没有subscriber了。他也会继续接受数据。

因此其实autoConnect()操作符和refCount()操作符的区别就是refCount()不能指定第几个订阅者订阅的时候才调用connect()

进行和源Observable的连接还有就是refCount()没有订阅者的时候回自动断开和源Observable的连接而autoConnect()

是不会执行这个操作的

另外Observable的publish()方法还有一个重载的方法:

public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector)

所有对 Observable 的操作(例如:map,flatmap,zip,concat,merge等)都可以在 selector 中使用。你可以通过 selector 返回一个Observable对象(注意不是ConnectableObservable),让后来的订阅者都订阅到这个返回的Observable对象上,这样可以确保

所有的订阅者都在同一时刻收到同样的数据。

这个重载函数返回的是 Observable 而不是 ConnectableObservable所以如果想要获取ConnectableObservable对象需要调用无参的方法

3.通过Subject/Processor将Cold Observable转换成Hot Observable对象

//rxjava中通过Subject/Processor将Cold Observable转换成为Hot ObservableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {@Overridepublic void subscribe(final ObservableEmitter<Long> e) throws Exception {Log.d(TAG,"subscribe : 开始发射数据" + "  Thread = " + Thread.currentThread().getName());Observable.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.computation()).take(5).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.d(TAG, "上游发射了 : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());e.onNext(aLong.longValue());if (aLong.longValue() >= 4) {e.onComplete();}}});}}).observeOn(Schedulers.newThread());Observer<Long> observer1 = new Observer<Long>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe1 开始订阅  " + Thread.currentThread().getName());}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "onNext1  : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError1 : " + e.getMessage() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG, "onComplete1 :  Thread = " + Thread.currentThread().getName());}};Observer observer2 = new Observer<Long>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG,"onSubscribe2 开始订阅  " + Thread.currentThread().getName());}@Overridepublic void onNext(Long aLong) {Log.d(TAG,"onNext2  : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG,"onError2 : " + e.getMessage() + " Thread = " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG,"onComplete2 :  Thread = " + Thread.currentThread().getName());}};/*** Subject/Processor这是一类非常特殊的对象他们既是Observable被观察者又是Observer观察者* 可以查看他们的实现,他们即实现了Observable又实现了ObserverSubject/Processor作为观察者它可以订阅目标Cold Observable是对方不管有没有订阅者都开始发送事件同时它又作为Observable可以转发或者发送事件序列,从而通过Subject/Processor的中继让Cold Observable变成了Hot Observable注意:Subject并不是线程安全的因此如果在并发的情况下可以通过toSerialized()方法将其变成线程安全的SerializedSubject对象进行使用*///创建Subject/Processor桥梁对象
//        PublishProcessor<Long> publishProcessor = PublishProcessor.create();Subject<Long> subject = PublishSubject.<Long>create().toSerialized();//通过桥梁对象先订阅observableobservable.subscribe(subject);//然后在让下游的订阅通过订阅Subject/Processor对象完成Cold Observable -> Hot Observable的转变subject.subscribe(observer1);SystemClock.sleep(300);subject.subscribe(observer2);

打印结果:

通过Subject/Processor也可以将我们的Cold Observable转换成为Hot observable,需要注意的如果当我们的Subject/Processor

在订阅了Cold Observable即调用了subscribe()方法之后

此时不管下游有没有订阅者订阅数据流都会马上开始进行数据流的发射而且只发射一次

4.replay()操作符也可以生成一个ConnectableObservable

//通过replay()操作符也可以生成ConnectableObservable对象Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {@Overridepublic void subscribe(final ObservableEmitter<Long> e) throws Exception {Log.d(TAG, "subscribe : 开始发射数据" + "  Thread = " + Thread.currentThread().getName());Observable.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.computation()).take(Integer.MAX_VALUE).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.d(TAG, "上游发射了 : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());e.onNext(aLong);}});}});ConnectableObservable<Long> connectableObservable = observable.replay(3);
//        ConnectableObservable<Long> connectableObservable = observable.replay();Observable<Long> normalObservable = connectableObservable.refCount();Consumer<Long> consumer1 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer1: " + o.longValue());}};Consumer<Long> consumer2 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer2: " + o.longValue());}};SystemClock.sleep(100);normalObservable.subscribe(consumer1);SystemClock.sleep(500);normalObservable.subscribe(consumer2);

打印结果:

       

replay()方法和 publish()一样,会返回一个 ConnectableObservable

区别是, replay()会为新的subscriber重放他之前所收到的上游数据

replay()操作符 当和源 Observable 链接后,开始收集数据。当有 Observer 订阅的时候,就把收集到的数据转发给

Observer。然后和其他 Observer 同时接受数据如以上的打印所示
replay有一些重载的方法:

主要有这么几个参数:

1.无参数:会收集缓存源Observable的所有数据,直到内存溢出

2.buffersize:用来指定缓存的最大数量。当新的 Observer 订阅的时候,最多只能收到 bufferSize 个之前缓存的数据
            3.time, unit:用来指定一个数据存货的时间,新订阅的 Observer 只能收到时间不超过这个参数的数据

4.selector:用来转换重复的 Observable
            5.schedulers:指定我们缓存的数据在哪个线程上

5.cache()/cacheWithInitialCapacity(int capacity) 内部维护一个ConnectableObservable

 //cache()操作符Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {@Overridepublic void subscribe(final ObservableEmitter<Long> e) throws Exception {Log.d(TAG, "subscribe : 开始发射数据" + "  Thread = " + Thread.currentThread().getName());Observable.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.computation()).take(Integer.MAX_VALUE).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.d(TAG, "上游发射了 : " + aLong.longValue() + " Thread = " + Thread.currentThread().getName());e.onNext(aLong);}});}}).cache()/*.cacheWithInitialCapacity(2)*/;  //该方法并不会限制缓存的数据的个数Consumer<Long> consumer1 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer1: " + o.longValue() + " thread : " + Thread.currentThread().getName());}};Consumer<Long> consumer2 = new Consumer<Long>() {@Overridepublic void accept(@NonNull Long o) throws Exception {Log.d(TAG,"consumer2: " + o.longValue() + " thread : " + Thread.currentThread().getName());}};SystemClock.sleep(100);observable.subscribe(consumer1);SystemClock.sleep(500);observable.subscribe(consumer2);

打印结果:

cache 操作函数和 replay 类似,但是隐藏了 ConnectableObservable ,并且不用管理disposable 了。当第一个 Observer

订阅的时候,内部的 ConnectableObservable 订阅到源 Observable。

后来的订阅者会收到之前缓存的数据,但是并不会重新订阅到源 Observable 上。

从上面示例中可以看到,只有当有订阅者订阅的时候,源 Observable 才开始执行。当第二个订阅者订阅的时候,会收到

之前缓存的数据。需要注意的是,如果所有的订阅者都取消订阅了 内部的 ConnectableObservable 不会取消订阅,这点和

refCount 不一样。只要第一个订阅者订阅了,内部的 ConnectableObservable 就连接到源 Observable上了并且不会取消

订阅了。 这点非常重要,因为当我们一旦订阅了,就没法取消源 Observable了,直到源 Observable 结束或者程序内存

溢出。 可以指定缓存个数的重载函数也没法解决这个问题,缓存限制只是作为一个优化的提示,并不会限制内部的缓存大小。

Rxjava中的Hot Observable 和 Cold Observable相关推荐

  1. Rxjava源码分析之IO.Reactivex.Observable

    Rxjava 源码系列目录 Rxjava源码分析之IO.Reactivex.Observer Rxjava源码分析之IO.Reactivex.CompositeDisposable Rxjava源码分 ...

  2. hot and cold observable

    A "hot" Observable may begin emitting items as soon as it is created, and so any observer ...

  3. rxjs的map和switchMap在SAP Spartacus中的应用 -将高阶Observable进行flatten操作

    switchMap相关文章 rxjs里switchMap operators的用法 通过rxjs的一个例子, 来学习SwitchMap的使用方法 rxjs switchMap的实现原理 rxjs的ma ...

  4. 如何形象地描述 RxJava 中的背压和流控机制?

    之前我在知乎上受邀回答过一个关于RxJava背压(Backpressure)机制的问题,今天我把它整理出来,希望对更多的人能有帮助. RxJava的官方文档中对于背压(Backpressure)机制比 ...

  5. rxjava背压_如何形象地描述RxJava中的背压和流控机制?

    之前我在知乎上受邀回答过一个关于RxJava背压(Backpressure)机制的问题,今天我把它整理出来,希望对更多的人能有帮助. RxJava的官方文档中对于背压(Backpressure)机制比 ...

  6. 深入Java泛型(四):RxJava中泛型的使用分析

    RxJava出现在我们的视线已经很久了,我自己也有阅读过非常多的文章,谈不上精通,但是勉强称得上会一些简单的使用,近日总是对这种响应式的编程,对RxJava魂牵梦绕,深刻的感觉到自己对泛型的认识,理解 ...

  7. RxJava中常见的几种Subject

    RxJava是什么? 原文是这样描述的: RxJava is a Java VM implementation of Reactive Extensions: a library for compos ...

  8. RxJava中的doOnSubscribe默认运行线程分析

    假设你对RxJava1.x还不是了解,能够參考以下文章. 1. RxJava使用介绍 [视频教程] 2. RxJava操作符   • Creating Observables(Observable的创 ...

  9. [译] RxJava 中的错误处理

    本文讲的是[译] RxJava 中的错误处理, 原文地址:Error handling in RxJava 原文作者:Dmitry Ryadnenko 译文出自:掘金翻译计划 本文永久链接:githu ...

  10. RxJava 中的map与flatMap

    1.map和flatMap都是接受一个函数作为参数(Func1) 2.map函数只有一个参数,参数一般是Func1,Func1的<I,O>I,O模版分别为输入和输出值的类型,实现Func1 ...

最新文章

  1. Chrome禁用浏览器跨域拦截
  2. 左右两个下拉列表框的选项互移及获值效果
  3. math-neon基于NEON指令的数学库
  4. 信阳哪些技校有学计算机的,2018年信阳十大技校排名 排名前十的学校有哪些
  5. linux树莓派mysql_树莓派4B(二):搭建LNMP(LINUX+NIGIX+MYSQL+PHP)+ Pi Dashboard
  6. 基于RBAC模型的通用企业权限管理系统
  7. 【转】Dynamics CRM:“the given key was not present in the dictionary”
  8. mac上配置mysql与redis server,并结合Pydev准备某爬虫环境
  9. select2,利用ajax高效查询大数据列表(可搜索、可分页)
  10. 利用数据缓存加速文件备份
  11. 浅谈PHP-FPM参数
  12. 【计算机图形学】Liang-Barsky裁剪算法(C++实现)
  13. R的农场 chebnear
  14. IE-LAB网络实验室:VPLS技术介绍
  15. 将加密的ppt文档解密,使之可以编辑
  16. java银联在线支付开发_银联在线支付案例代码
  17. Easyexcel·读取excel
  18. Logo tools
  19. linux——搭建NTP服务器
  20. Emoj cheat sheet

热门文章

  1. 罗技产品序列号追溯条码扫描系统
  2. Elasticsearch短语或近似匹配及召回率案例深入剖析-搜索系统线上实战
  3. 【文献阅读笔记】之Label Refinement Network for Coarse-to-Fine Semantic Segmentation
  4. 固态硬盘是什么接口_SATA M.2 PCIe?一分钟教你认识固态硬盘接口
  5. putty登录树莓派4超时
  6. 多址接入技术 FDMA TDMA CDMA NOMA
  7. Power BI----综合应用
  8. 【C++】有趣的数字
  9. Adaptive Object Detection Using Adjacency and Zoom Prediction
  10. Python-docx 读写 Word 文档:插入图片、表格,设置表格样式,章节,页眉页脚等