版权声明:本文为openXu原创文章【openXu的博客】,未经博主允许不得以任何形式转载

目录:

文章目录

  • 1. CombineLatest
  • 2. Join
  • 3. Merge
  • 4. StartWith
  • 5. Switch
  • 6. Zip

结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符:

1. CombineLatest

  当两个Observables中的任何一个发射数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射出一个新的数据。
  好比工厂的流水线,下面一件产品需要两个流水线上的零件组合,流水线1的工人生产了一个零件,只有等流水线2的工人生产了另一个零件的时候,才能组合成一个产品;流水线2的工人速度快一些,很快生产了第二个零件,这时候流水线1的工人还没有生产第二个零件,流水线2的工人就会拿流水线1的第一个零件将就用着合成第二个产品。这个例子只是方便理解,我们假设零件可以复用。仔细看下图,应该就能明白这个步骤了:

    

  CombineLatest操作符能接受2~9个Observable或者一个Observable集合作为参数,当其中一个Observable要发射数据时,它会用传入的Func函数对每个Observable最近发射的数据进行组合后发射一个新的数据。这里有两个规则:

  • 所有的Observable必须都发射过数据,如果其中一个Observable从来没发射过数据,将不会组合发射新的数据;
  • 满足上面条件之后,当其中任何一个Observable要发射数据时,就会调用Func函数对所有Observable最近发射的数据进行组合(每个Observable贡献一个),然后发射出去。

    

示例代码:

//创建不同名称的Observable(每隔100ms发射一个数据 ):
private Observable<String> getObservable(String name){return Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {if(name.contains("-")){for (int i = 1; i <=3; i++) {Log.v(TAG, name+i);subscriber.onNext(name+i);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}subscriber.onCompleted();}}}).subscribeOn(Schedulers.newThread());
}Observable.combineLatest(getObservable("one->"), getObservable("two->"), getObservable("three->"),new Func3<String, String, String,String>() {//使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值@Overridepublic String call(String str1, String str2, String str3) {return str1+","+str2+","+str3;}
}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v(TAG, "combineLatest:"+s);}
});

输出:

one->1
two->1
three->1
combineLatest:one->1,two->1,three->1
one->2
combineLatest:one->2,two->1,three->1
two->2
combineLatest:one->2,two->2,three->1
three->2
combineLatest:one->2,two->2,three->2
one->3
combineLatest:one->3,two->2,three->2
two->3
combineLatest:one->3,two->3,three->2
three->3
combineLatest:one->3,two->3,three->3

从log可以看出,当one和two发射第一条数据的时候,并没有组合,因为要等所有的Observable都发射过数据,当three发射第一条数据的时候,Func会组合三个Observable最近发射的数据组合后发射。然后one要发射第二条数据了,Func会拿one的第二条、two的第一条、three的第一条组合;接下来应该是two要发射第二条数据,Func会拿one的第二条,two的第二条,three的第一条组合…

2. Join

  如果一个Observable发射了一条数据,只要在另一个Observable发射的数据定义的时间窗口内,就结合两个Observable发射的数据,然后发射结合后的数据。

    

  目标Observable和源Observable发射的数据都有一个有效时间限制,比如目标发射了一条数据(a)有效期为3s,过了2s后,源发射了一条数据(b),因为2s<3s,目标的那条数据还在有效期,所以可以组合为ab;再过2s,源又发射了一条数据(c),这时候一共过去了4s,目标的数据a已经过期,所以不能组合了…

使用join操作符需要4个参数,分别是:

  • 源Observable所要组合的目标Observable
  • 一个函数,接受从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射出来数据的有效期
  • 一个函数,接受从目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射出来数据的有效期
  • 一个函数,接受从源Observable和目标Observable发射来的数据,并返回最终组合完的数据。

Rxjava还实现了groupJoin,基本和join相同,只是最后组合函数的参数不同。
    

示例代码:

//目标Observable
Observable<Integer> obs1 = Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 1; i < 5; i++) {subscriber.onNext(i);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
});
//join
Observable.just("srcObs-").join(obs1,//接受从源Observable发射来的数据,并返回一个Observable,//这个Observable的生命周期决定了源Observable发射出来数据的有效期new Func1<String, Observable<Long>>() {@Overridepublic Observable<Long> call(String s) {return Observable.timer(3000, TimeUnit.MILLISECONDS);}},//接受从目标Observable发射来的数据,并返回一个Observable,//这个Observable的生命周期决定了目标Observable发射出来数据的有效期new Func1<Integer, Observable<Long>>() {@Overridepublic Observable<Long> call(Integer integer) {return Observable.timer(2000, TimeUnit.MILLISECONDS);}},//接收从源Observable和目标Observable发射来的数据,并返回最终组合完的数据new Func2<String,Integer,String>() {@Overridepublic String call(String str1, Integer integer) {return str1 + integer;}})
.subscribe(new Action1<String>() {@Overridepublic void call(String o) {Log.v(TAG,"join:"+o);}
});//groupJoin
Observable.just("srcObs-").groupJoin(obs1,new Func1<String, Observable<Long>>() {@Overridepublic Observable<Long> call(String s) {return Observable.timer(3000, TimeUnit.MILLISECONDS);}},new Func1<Integer, Observable<Long>>() {@Overridepublic Observable<Long> call(Integer integer) {return Observable.timer(2000, TimeUnit.MILLISECONDS);}},new Func2<String,Observable<Integer>, Observable<String>>() {@Overridepublic Observable<String> call(String s, Observable<Integer> integerObservable) {return integerObservable.map(new Func1<Integer, String>() {@Overridepublic String call(Integer integer) {return s+integer;}});}}).subscribe(new Action1<Observable<String>>() {@Overridepublic void call(Observable<String> stringObservable) {stringObservable.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v(TAG,"groupJoin:"+s);}});}});

输出:

join:srcObs-1
join:srcObs-2
join:srcObs-3
groupJoin:srcObs-1
groupJoin:srcObs-2
groupJoin:srcObs-3

分析:源Observable只发射了一条“srcObs-”的数据,有效期为3s,目标Observable每隔1s发射一条数据,每条数据有效期为2s。在“srcObs-”有效期间,obs1一共发射了三条数据,都能结合,最后“srcObs-”过期了,obs1发射的数据就舍弃了,所以一共输出三条。

3. Merge

  使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

Merge操作符有两种:

  • merge:任何一个原始Observable的onError通知会被立即传递给观察者,而且会终止合并后的Observable。
        
  • mergeDelayError: mergeDelayError操作符会保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。
        

示例代码:

/** merge:当其中一个Observable发生onError时,就会终止发射数据,并将OnError传递给观察者*/
Observable<Integer> odds = Observable.just(1, 3, 5);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {Log.v(TAG, "merge Next: " + item);}@Overridepublic void onError(Throwable error) {Log.e(TAG, "merge Error: " + error.getMessage());}@Overridepublic void onCompleted() {Log.v(TAG, "merge Sequence complete.");}});/** mergeDelayError:当发生onError时,会等待其他Observable将数据发射完,然后才将onError发送个观察者*/
Observable.mergeDelayError(Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 0; i < 5; i++) {if (i == 3) {subscriber.onError(new Throwable("第一个发射onError了"));}subscriber.onNext(i);}}
}), Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for (int i = 10; i < 15; i++) {subscriber.onNext(i);}subscriber.onCompleted();}
})).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {Log.v(TAG, "mergeDelayError Next: " + item);}@Overridepublic void onError(Throwable error) {Log.e(TAG, "mergeDelayError Error: " + error.getMessage());}@Overridepublic void onCompleted() {Log.v(TAG, "mergeDelayError Sequence complete.");}
});

输出:

merge Next: 1
merge Next: 3
merge Next: 5
merge Next: 2
merge Next: 4
merge Next: 6
merge Sequence complete.

mergeDelayError Next: 0
mergeDelayError Next: 1
mergeDelayError Next: 2
mergeDelayError Next: 3
mergeDelayError Next: 4
mergeDelayError Next: 10
mergeDelayError Next: 11
mergeDelayError Next: 12
mergeDelayError Next: 13
mergeDelayError Next: 14
mergeDelayError Error: 第一个发射onError了

4. StartWith

  startWith操作符可以在Observable在发射数据之前先发射一个指定的数据序列。它可以接受一个Iterable或者多个Observable作为函数的参数。
  如果我们传递一个Observable给startWith,它会将这个Observable的数据插在原始Observable发射的数据序列之前。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用Concat操作符。)
    
示例代码:

/** 插入一个Observable*/
Observable<Integer> obs1 = Observable.just(1, 2, 3);
Observable<Integer> obs2 = Observable.just(4, 5, 6);
obs1.startWith(obs2).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {Log.v(TAG, "onNext:"+integer);}
});/** 插入数据序列(最多接受9个参数)*/
Observable<String> obs3 = Observable.just("c","d","e");
obs3.startWith("a", "b").subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v(TAG, "onNext:"+s);}
});

输出:

onNext:4
onNext:5
onNext:6
onNext:1
onNext:2
onNext:3

onNext:a
onNext:b
onNext:c
onNext:d
onNext:e

5. Switch

  将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。
   Switch订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。

  注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后,前一个Observable发射的数据将被丢弃(就像图例上的那个黄色圆圈一样)。

    

示例代码:

Observable.switchOnNext(Observable.create(new Observable.OnSubscribe<Observable<Long>>() {@Overridepublic void call(Subscriber<? super Observable<Long>> subscriber) {for (int i = 1; i < 3; i++) {//每隔1s发射一个小Observable。小Observable每1s发射一个整数//第一个小Observable将发射6个整数,第二个将发射3个整数subscriber.onNext(Observable.interval(1000, TimeUnit.MILLISECONDS).take(i==1?6:3));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}
)).subscribe(new Action1<Long>() {@Overridepublic void call(Long s) {Log.v(TAG, "onNext:"+s);}
});

输出:

onNext:0
onNext:0
onNext:1
onNext:2

从Log可以看出,第一个Observable每隔1s发射一个数据,总共发射6条数据;第二个Observable正好在第一个Observable发射1的时候产生了,这时候将不再订阅第一个Observable,所以第一个Observable只发射了一个0,后面的5个数据都被舍弃了。

6. Zip

  通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。
    
  Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序发射数据。它只发射与发射数据项最少的那个Observable一样多的数据。
RxJava将这个操作符实现为zip(static)和zipWith(非static):

  • zip

    • Javadoc: zip(Iterable,FuncN)
    • Javadoc: zip(Observable,FuncN)
    • Javadoc: zip(Observable,Observable,Func2) (最多可以有九个Observables参数)

  zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。
    

  • zipWith

    • Javadoc: zipWith(Observable,Func2)
    • Javadoc: zipWith(Iterable,Func2)

  zipWith和zip的区别是zipWith不是static的,它必须由一个Observable对象调用,zipWith操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable。
    

示例代码:

Observable obs1 = Observable.just(1,2,3,4);
Observable obs2 = Observable.just(10,20,30,40);
/** zip(Observable,FuncN):* ①.能接受1~9个Observable作为参数,或者单个Observables列表作为参数;*    Func函数的作用就是从每个Observable中获取一个数据进行结合后发射出去;* ②.小Observable的每个数据只能组合一次,如果第二个小Observable发射数据的时候,*    第一个还没有发射,将要等待第一个发射数据后才能组合;*/
Observable.zip(obs1, obs2,new Func2<Integer, Integer, String>() {//使用一个函数结合每个小Observable的一个数据(每个数据只能组合一次),然后发射这个函数的返回值@Overridepublic String call(Integer int1, Integer int2) {return int1+"-"+int2;}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v(TAG, "zip:"+s);}
});/** zipWith(Observable,Func2):* ①.zipWith不是static的,必须由一个Observable对象调用* ②.如果要组合多个Observable,可以传递Iterable*/
obs1.zipWith(obs2, new Func2<Integer, Integer, String>() {//使用一个函数结合每个小Observable的一个数据(每个数据只能组合一次),然后发射这个函数的返回值@Overridepublic String call(Integer int1, Integer int2) {return int1+"-"+int2;}
}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v(TAG, "zipWith:"+s);}
});

输出:

zip:1-10
zip:2-20
zip:3-30
zip:4-40

zipWith :1-10
zipWith :2-20
zipWith :3-30
zipWith :4-40

**有问题请留言,有帮助请点赞(^__^)**

#源码下载:

https://github.com/openXu/RxJavaTest

RxJava操作符(05-结合操作)相关推荐

  1. Rxjava操作符之过滤操作

    前言: 本文将介绍以下过滤类操作符(基于Rxjava2.0): filter ofType take takeLast first firstOrError last lastOrError skip ...

  2. RxJava操作符(四)Combining

    RxJava操作符(四)Combining 原文链接 http://blog.chinaunix.net/uid-20771867-id-5197584.html 上一篇文章中我们了解了如何对数据进行 ...

  3. Android RxJava操作符的学习---创建操作符

    RxJava如此受欢迎的原因,在于其提供了丰富 & 功能强大的操作符,几乎能完成所有的功能需求 1. 简介 RxJava 操作符的具体简介如下: 2. 类型 RxJava功能强大,所以其对应的 ...

  4. RxJava操作符学习APP

    用于学习RxJava操作符的app 下载地址: fir.im http://fir.im/bpdu 或者直接在 Release里面下载 https://github.com/jiang111/RxJa ...

  5. Android RxJava操作符的学习---过滤操作符

    3.5 过滤操作符 作用 过滤 / 筛选 被观察者(Observable)发送的事件 & 观察者 (Observer)接收的事件 应用场景 根据 指定条件 过滤事件 根据 指定事件数量 过滤事 ...

  6. RxJava 操作符 do

    看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...

  7. RxJava操作符在android中的使用场景详解(一)

    转载请注明出处:http://www.wangxinarhat.com/2016/04/19/2016-04-19-rxjava-android-operate1/ 最近学习了RxJava在andro ...

  8. (转载)如何理解RxJava中的join操作

    转载:http://avenwu.net/2016/05/10/understand-the-join-operation-in-rx/ 前言 先前写过一篇文章,介绍Rx中不容易理解的概念(Rx那些不 ...

  9. Android RxJava操作符的学习---功能性操作符

    3.4 功能性操作符 作用 辅助被观察者(Observable) 在发送事件时实现一些功能性需求 实际应用场景 连接(订阅) 观察者 & 被观察者 线程调度(切换) 错误处理 事件生命周期操作 ...

最新文章

  1. Build-dep linux 知乎,删除通过apt-get build-dep安装的软件包
  2. 成功之路该如何走--工作半年的思考
  3. Java每天5道面试题,跟我走,offer有!(八)
  4. Lesson 16.3 卷积操作
  5. 副部长和消费者拼单 拼多多发放农民丰收节交易会消费券
  6. .net开发微信公众平台
  7. 通过hibernate去调用存储过程
  8. 能用机器完成的,千万别堆工作量|持续集成中的性能自动化测试
  9. 弹出选择文件夹的对话框 BROWSEINFO 的用法【MFC】
  10. Ubuntu 中将Python3 置为默认版本
  11. iOS 日志管理异常捕获组件LFLogManager
  12. 阐述Spring框架中Bean的生命周期?
  13. java提取图片文字
  14. HC05蓝牙模块 修改密码 返回 ERROR:(1D)解决办法
  15. _spellmod_leech_spell
  16. 机器学习学习过程记录
  17. android 通过adb工具实现无线连接安卓设备并调试
  18. 如何写一个 Burp 插件
  19. 干货 | Logstash自定义正则表达式ETL实战
  20. 小程序判断是否有上一页

热门文章

  1. day22Java-JDK8新特性演示
  2. html5 麦克风事件,HTML5 Audio 麦克风 声音触发亮光
  3. EasyExcel导出Excel 自定义导出时间格式
  4. 天府大地艺术季精彩再续·晨光社区农耕节踏春而来
  5. 前导问题word使用技巧---解决Word 生成目录时前导符不一致的问题(即通常所谓的目录中省略号大小不一致)
  6. 国外的用户发现无法访问迅雷看看
  7. python读xml文件生成头文件_python如何读取生成voc xml格式标注信息
  8. 使用yarn install时报错:00h00m00s 0/0: : ERROR: [Errno 2] No such file or directory: ‘install‘
  9. 商城网店网页设计方案
  10. 马化腾:明年将专注于开放平台和移动产品