本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处

CSDN学院课程地址

  • RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10036
  • RxJava2从入门到精通-中级篇:https://edu.csdn.net/course/detail/10037
  • RxJava2从入门到精通-进阶篇:https://edu.csdn.net/course/detail/10038
  • RxJava2从入门到精通-源码分析篇:https://edu.csdn.net/course/detail/10138

3. RxJava操作符

RxJava操作符也是其精髓之一,可以通过一个简单的操作符,实现复杂的业务逻辑,甚至还可以将操作符组合起来(即RxJava的组合过程),完成更为复杂的业务需求。比如我们前面用到的.create().subscribeOn().observeOn().subscribe()都是RxJava的操作符之一,下面我们将对RxJava的操作符进行分析

掌握RxJava操作符前,首先要学会看得懂RxJava的图片,图片是RxJava主导的精髓,下面我们通过例子说明

这张图片我们先要分清楚概念上的东西,上下两行横向的直线区域代表着事件流,上面一行(上游)是我们的被观察者Observable,下面一行(下游)是我们的观察者Observer,事件流就是从上游的被观察者发送给下游的观察者的。而中间一行的flatMap区域则是我们的操作符部分,它可以对我们的数据进行变换操作。最后,数据流则是图片上的圆形、方形、菱形等区域,也是从上游流向下游的,不同的形状代表着不同的数据类型

这张图片并不是表示没有被观察者Observable,而是Create方法本身就是创建了被观察者,所以可以将被观察者的上游省略。在进行事件的onNext()分发后,执行onComplete()事件,这样就表示事件流已经结束,后续如果上游继续发事件,则下游表示不接收。当事件流的onCompleted()或者onError()正好被调用过一次后,此后就不能再调用观察者的任何其它回调方法

在理解RxJava操作符之前,需要将这几个概念弄明白,整个操作符的章节都是围绕这几个概念进行的

  • 事件流:通过发射器发射的事件,从发射事件到结束事件的过程,这一过程称为事件流
  • 数据流:通过发射器发射的数据,从数据输入到数据输出的过程,这一过程称为数据流
  • 被观察者:事件流的上游,即Observable,事件流开始的地方和数据流发射的地方
  • 观察者:事件流的下游,即Observer,事件流结束的地方和数据流接收的地方

3.1 Creating Observables (创建操作符)

1、create

Observable最原始的创建方式,创建出一个最简单的事件流,可以使用发射器发射特定的数据类型

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {e.onNext(i);}e.onComplete();}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onComplete

2、from

创建一个事件流并发出特定类型的数据流,其发射的数据流类型有如下几个操作符

public static void main(String[] args) {Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

3、just

just操作符和from操作符很像,只是方法的参数有所差别,它可以接受多个参数

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

4、defer

defer与just的区别是,just是直接将发射当前的数据流,而defer会等到订阅的时候,才会去执行它的call()回调,再去发射当前的数据流。复杂点的理解就是:defer操作符是将一组数据流在原有的事件流基础上缓存一个新的事件流,直到有人订阅的时候,才会创建它缓存的事件流

public static void main(String[] args) {i = 10;Observable<Integer> just = Observable.just(i, i);Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {@Overridepublic ObservableSource<?> call() throws Exception {//缓存新的事件流return Observable.just(i, i);}});i = 15;just.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});defer.subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + (int) o);}});i = 20;defer.subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + (int) o);}});
}

输出

onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20

5、interval

interval操作符是按固定的时间间隔发射一个无限递增的整数数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,interval默认在computation调度器上执行

public void interval() {Observable.interval(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
......

6、range

range操作符发射一个范围内的有序整数数据流,你可以指定范围的起始和长度

public static void main(String[] args) {Observable.range(1, 5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

7、repeat

repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行

public static void main(String[] args) {Observable.just(1).repeat(5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=1
onNext=1
onNext=1
onNext=1

8、timer

timer操作符可以创建一个延时的事件流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行,默认在computation调度器上执行

public void timer() {Observable.timer(5, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}

输出

onNext=0

9、小结

  1. create():创建最简单的事件流
  2. from():创建事件流,可发送不同类型的数据流
  3. just():创建事件流,可发送多个参数的数据流
  4. defer():创建事件流,可缓存可激活事件流
  5. interval():创建延时重复的事件流
  6. range():创建事件流,可发送范围内的数据流
  7. repeat():创建可重复次数的事件流
  8. timer():创建一次延时的事件流

补充:interval()、timer()、delay()的区别

  1. interval():用于创建事件流,周期性重复发送
  2. timer():用于创建事件流,延时发送一次
  3. delay():用于事件流中,可以延时某次事件流的发送

3.2 Transforming Observables (转换操作符)

1、map

map操作符可以将数据流进行类型转换

public static void main(String[] args) {Observable.just(1).map(new Function<Integer, String>() {@Overridepublic String apply(Integer integer) throws Exception {return "发送过来的数据会被变成字符串" + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}

输出

onNext=发送过来的数据会被变成字符串1

2、flatMap

flatMap操作符将数据流进行类型转换,然后将新的数据流传递给新的事件流进行分发,这里通过模拟请求登录的延时操作进行说明,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void flatMap() {Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(UserParams userParams) throws Exception {return Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println(s);}});
}public static class UserParams {public UserParams(String username, String password) {this.username = username;this.password = password;}public String username;public String password;
}

输出

hensen登录成功

补充:

  • concatMap与flatMap功能一样,唯一的区别就是concatMap是有序的,flatMap是乱序的

3、groupBy

groupBy操作符可以将发射出来的数据项进行分组,并将分组后的数据项保存在具有key-value映射的事件流中。groupBy具体的分组规则由groupBy操作符传递进来的函数参数Function所决定的,它可以将key和value按照Function的返回值进行分组,返回一个具有分组规则的事件流GroupedObservable,注意这里分组出来的事件流是按照原始事件流的顺序输出的,我们可以通过sorted()对数据项进行排序,然后输出有序的数据流。

public static void main(String[] args) {Observable.just("java", "c++", "c", "c#", "javaScript", "Android").groupBy(new Function<String, Character>() {@Overridepublic Character apply(String s) throws Exception {return s.charAt(0);//按首字母分组}}).subscribe(new Consumer<GroupedObservable<Character, String>>() {@Overridepublic void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {//排序后,直接订阅输出key和valuecharacterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);}});}});
}

输出

onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript

4、scan

scan操作符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断所有数据的最小值等

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer < integer2 ? integer : integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer item) throws Exception {System.out.println("onNext=" + item);}});
}

输出

onNext=8
onNext=2
onNext=2
onNext=1
onNext=1

5、buffer

buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。如果发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。如果对buffer操作符设置了skip参数,则buffer每次缓存池溢满时,会跳过指定的skip数据项,然后再进行缓存和输出。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).buffer(5).subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {System.out.println("onNext=" + integers.toString());}
});

输出

onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]

6、window

window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流,也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理。

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).window(2, 1).subscribe(new Consumer<Observable<Integer>>() {@Overridepublic void accept(Observable<Integer> integerObservable) throws Exception {integerObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});}});
}

输出

onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9

7、小结

  1. map():对数据流的类型进行转换
  2. flatMap():对数据流的类型进行包装成另一个数据流
  3. groupby():对所有的数据流进行分组
  4. scan():对上一轮处理过后的数据流进行函数处理
  5. buffer():缓存发射的数据流到一定数量,随后发射出数据流集合
  6. window():缓存发射的数据流到一定数量,随后发射出新的事件流

3.3 Filtering Observables (过滤操作符)

1、debounce

debounce操作符会去过滤掉发射速率过快的数据项,下面的例子onNext事件可以想象成按钮的点击事件,如果在2秒种内频繁的点击,则其点击事件会被忽略,当i为3的除数的时候,发射的事件的时间会超过规定忽略事件的时间,那么则允许触发点击事件。这就有点像我们频繁点击按钮,但始终只会触发一次点击事件,这样就不会导致重复去响应点击事件

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 100; i++) {if (i % 3 == 0) {Thread.sleep(3000);} else {Thread.sleep(1000);}emitter.onNext(i);}}}).debounce(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......

2、distinct

distinct操作符会过滤重复发送的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).distinct().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4

3、elementAt

elementAt操作符只取指定的角标的事件

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1

4、filter

filter操作符可以过滤指定函数的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer > 2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=3
onNext=4
onNext=3

5、first

first操作符只发射第一项数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).first(7).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1

6、ignoreElements

ignoreElements操作符不发射任何数据,只发射事件流的终止通知

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).ignoreElements().subscribe(new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}

输出

onComplete

7、last

last操作符只发射最后一项数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).last(7).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=3

8、sample

sample操作符会在指定的事件内从数据项中采集所需要的数据,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void sample() {Observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}

输出

onNext=2
onNext=4
onNext=6
onNext=8

9、skip

skip操作符可以忽略事件流发射的前N项数据项,只保留之后的数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).skip(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}

输出

onNext=4
onNext=5
onNext=6
onNext=7
onNext=8

10、skipLast

skipLast操作符可以抑制事件流发射的后N项数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).skipLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

11、take

take操作符可以在事件流中只发射前面的N项数据

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).take(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}

输出

onNext=1
onNext=2
onNext=3

12、takeLast

takeLast操作符事件流只发射数据流的后N项数据项,忽略前面的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).takeLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}

输出

onNext=6
onNext=7
onNext=8

还有一个操作符叫takeLastBuffer,它和takeLast类似,,唯一的不同是它把所有的数据项收集到一个List再发射,而不是依次发射一个

13、小结

  1. debounce():事件流只发射规定范围时间内的数据项
  2. distinct():事件流只发射不重复的数据项
  3. elementAt():事件流只发射第N个数据项
  4. filter():事件流只发射符合规定函数的数据项
  5. first():事件流只发射第一个数据项
  6. ignoreElements():忽略事件流的发射,只发射事件流的终止事件
  7. last():事件流只发射最后一项数据项
  8. sample():事件流对指定的时间间隔进行数据项的采样
  9. skip():事件流忽略前N个数据项
  10. skipLast():事件流忽略后N个数据项
  11. take():事件流只发射前N个数据项
  12. takeLast():事件流只发射后N个数据项

3.4 Combining Observables (组合操作符)

1、merge/concat

merge操作符可以合并两个事件流,如果在merge操作符上增加延时发送的操作,那么就会导致其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的却是两次数据流。由于concat操作符和merge操作符的效果是一样的,这里只举一例

merge和concat的区别

  • merge():合并后发射的数据项是无序的
  • concat():合并后发射的数据项是有序的
public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {@Overridepublic void accept(Serializable serializable) throws Exception {System.out.println("onNext=" + serializable.toString());}});
}

输出

onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

2、zip

zip操作符是将两个数据流进行指定的函数规则合并

public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");Observable.zip(just1, just2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) throws Exception {return s + s2;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}

输出

onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5

3、startWith

startWith操作符是将另一个数据流合并到原数据流的开头

public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");just1.startWith(just2).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E

4、join

join操作符是有时间期限的合并操作符,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void join() {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);just1.join(just2, new Function<String, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(String s) throws Exception {return Observable.timer(3, TimeUnit.SECONDS);}}, new Function<Long, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(Long l) throws Exception {return Observable.timer(8, TimeUnit.SECONDS);}}, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}

join操作符有三个函数需要设置

  • 第一个函数:规定just2的过期期限
  • 第二个函数:规定just1的过期期限
  • 第三个函数:规定just1和just2的合并规则

由于just2的期限只有3秒的时间,而just2延时1秒发送一次,所以just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点类似我们的排列组合

onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1

5、combineLatest

conbineLatest操作符会寻找其他事件流最近发射的数据流进行合并,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public static String[] str = {"A", "B", "C", "D", "E"};public void combineLatest() {Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {return str[(int) (aLong % 5)];}});Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}

输出

onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5

6、小结

  1. merge()/concat():无序/有序的合并两个数据流
  2. zip():两个数据流的数据项合并成一个数据流一同发出
  3. startWith():将待合并的数据流放在自身前面一同发出
  4. join():将数据流进行排列组合发出,不过数据流都是有时间期限的
  5. combineLatest():合并最近发射出的数据项成数据流一同发出

3.5 Error Handling Operators(错误处理操作符)

1、onErrorReturn

onErrorReturn操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onError crash"));}e.onNext(i);}}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

2、onErrorResumeNext

onErrorResumeNext操作符表示当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onError crash"));}e.onNext(i);}}}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {return Observable.just(-1);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

3、onExceptionResumeNext

onExceptionResumeNext操作符表示当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onException crash"));//e.onError(new Error("onError crash"));}e.onNext(i);}}}).onExceptionResumeNext(new ObservableSource<Integer>() {@Overridepublic void subscribe(Observer<? super Integer> observer) {//备用事件流observer.onNext(8);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=8

4、retry

retry操作符表示当错误发生时,发射器会重新发射

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if (i == 4) {e.onError(new Exception("onError crash"));}e.onNext(i);}}}).retry(1).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
  • retry():表示重试无限次
  • retry(long times):表示重试指定次数
  • retry(Func predicate):可以根据函数参数中的Throwable类型和重试次数决定本次需不需要重试

5、retryWhen

retryWhen操作符和retry操作符相似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

private static int retryCount = 0;
private static int maxRetries = 2;public void retryWhen(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if (i == 4) {e.onError(new Exception("onError crash"));}e.onNext(i);}}}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {if (++retryCount <= maxRetries) {// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);return Observable.timer(1, TimeUnit.SECONDS);}return Observable.error(throwable);}});}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}

输出

onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

6、小结

  • onErrorReturn():当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()
  • onErrorResumeNext():当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()
  • onExceptionResumeNext():当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射
  • retry():当错误发生时,发射器会重新发射
  • retryWhen():当错误发生时,根据Tharowble类型决定发射器是否重新发射

3.6 Observable Utility Operators(辅助性操作符)

1、delay

delay操作符可以延时某次事件发送的数据流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void deley() {Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

delay和delaySubscription的效果是一样的,只不过delay是对数据流的延时,而delaySubscription是对事件流的延时

2、do

do操作符可以监听整个事件流的生命周期,do操作符分为多个类型,而且每个类型的作用都不同

  1. doOnNext():接收每次发送的数据项
  2. doOnEach():接收每次发送的数据项
  3. doOnSubscribe():当事件流被订阅时被调用
  4. doOnDispose():当事件流被释放时被调用
  5. doOnComplete():当事件流被正常终止时被调用
  6. doOnError():当事件流被异常终止时被调用
  7. doOnTerminate():当事件流被终止之前被调用,无论正常终止还是异常终止都会调用
  8. doFinally():当事件流被终止之后被调用,无论正常终止还是异常终止都会调用
public static void main(String[] args) {Observable.just(1, 2, 3).doOnNext(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("doOnNext");}}).doOnEach(new Consumer<Notification<Integer>>() {@Overridepublic void accept(Notification<Integer> integerNotification) throws Exception {System.out.println("doOnEach");}}).doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) throws Exception {System.out.println("doOnSubscribe");}}).doOnDispose(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnDispose");}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnTerminate");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("doOnError");}}).doOnComplete(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnComplete");}}).doFinally(new Action() {@Overridepublic void run() throws Exception {System.out.println("doFinally");}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally

3、materialize/dematerialize

materialize操作符将发射出的数据项转换成为一个Notification对象,而dematerialize操作符则是跟materialize操作符相反,这两个操作符有点类似我们Java对象的装箱和拆箱功能

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).materialize().subscribe(new Consumer<Notification<Integer>>() {@Overridepublic void accept(Notification<Integer> integerNotification) throws Exception {System.out.println("onNext=" + integerNotification.getValue());}});Observable.just(1, 2, 3, 4, 5).materialize().dematerialize().subscribe(new Consumer<Object>() {@Overridepublic void accept(Object object) throws Exception {System.out.println("onNext=" + object.toString());}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

输出的时候,materialize会输出多个null,是因为null的事件为onCompleted事件,而dematerialize把onCompleted事件给去掉了,这个原因也可以从图片中看出来

4、serialize

serialize操作符可以将异步执行的事件流进行同步操作,直到事件流结束

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).serialize().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

5、timeInterval

timeInterval操作符可以将发射的数据项转换为带有时间间隔的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void timeInterval(){Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS).subscribe(new Consumer<Timed<Long>>() {@Overridepublic void accept(Timed<Long> longTimed) throws Exception {System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());}});
}

输出

onNext=0 timeInterval=2
onNext=1 timeInterval=2
onNext=2 timeInterval=2
onNext=3 timeInterval=2
onNext=4 timeInterval=2

6、timeout

timeout操作符表示当发射的数据项超过了规定的限制时间,则发射onError事件,这里直接让程序超过规定的限制时间,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void timeOut(){Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}});
}

输出

onError

7、timestamp

timestamp操作符会给每个发射的数据项带上时间戳,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void timeStamp() {Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS).subscribe(new Consumer<Timed<Long>>() {@Overridepublic void accept(Timed<Long> longTimed) throws Exception {System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());}});
}

输出

onNext=0 timeInterval=1525755132132
onNext=1 timeInterval=1525755134168
onNext=2 timeInterval=1525755136132
onNext=3 timeInterval=1525755138132

8、using

using操作符可以让你的事件流存在一次性的数据项,即用完就将资源释放掉

using操作符接受三个参数:

  • 一个用户创建一次性资源的工厂函数
  • 一个用于创建一次性事件的工厂函数
  • 一个用于释放资源的函数
public static class UserBean {String name;int age;public UserBean(String name, int age) {this.name = name;this.age = age;}
}public static void main(String[] args) {Observable.using(new Callable<UserBean>() {@Overridepublic UserBean call() throws Exception {//从网络中获取某个对象return new UserBean("俊俊俊", 22);}}, new Function<UserBean, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(UserBean userBean) throws Exception {//拿出你想要的资源return Observable.just(userBean.name);}}, new Consumer<UserBean>() {@Overridepublic void accept(UserBean userBean) throws Exception {//释放对象userBean = null;}}).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + o.toString());}});
}

输出

onNext=俊俊俊

9、to

to操作符可以将数据流中的数据项进行集合的转换,to操作符分为多个类型,而且每个类型的作用都不同

  1. toList():转换成List类型的集合
  2. toMap():转换成Map类型的集合
  3. toMultimap():转换成一对多(即<A类型,List<B类型>>)的Map类型的集合
  4. toSortedList():转换成具有排序的List类型的集合
public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).toList().subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {System.out.println("onNext=" + integers.toString());}});
}

输出

onNext=[1, 2, 3, 4, 5]

10、小结

  1. delay():延迟事件发射的数据项
  2. do():监听事件流的生命周期
  3. materialize()/dematerialize():对事件流进行装箱/拆箱
  4. serialize():同步事件流的发射
  5. timeInterval():对事件流增加时间间隔
  6. timeout():对事件流增加限定时间
  7. timestamp():对事件流增加时间戳
  8. using():对事件流增加一次性的资源
  9. to():对数据流中的数据项进行集合的转换

3.7 Conditional and Boolean Operators(条件和布尔操作符)

1、all

all操作符表示对所有数据项进行校验,如果所有都通过则返回true,否则返回false

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).all(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer > 0;}}).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}

输出

onNext=true

2、contains

contains操作符表示事件流中发射的数据项当中是否包含有指定的数据项

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).contains(2).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}

输出

onNext=true

3、amb

amb操作符在多个事件流中只发射最先发出数据的事件流,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void amb(){List<Observable<Integer>> list = new ArrayList<>();list.add(Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS));list.add(Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS));list.add(Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS));Observable.amb(list).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=7
onNext=8
onNext=9

4、defaultIfEmpty

defaultIfEmpty操作符会在事件流没有发射任何数据时,发射一个指定的默认值

public static void main(String[] args) {Observable.empty().defaultIfEmpty(-1).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + o.toString());}});
}

输出

onNext=-1

5、sequenceEqual

sequenceEqual操作符可以判断两个数据流是否完全相等

public static void main(String[] args) {Observable<Integer> just1 = Observable.just(1, 2, 3);Observable<Integer> just2 = Observable.just(1, 2, 3);Observable.sequenceEqual(just1, just2).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}

输出

onNext=true

6、skipUntil/skipWhile

skipUtils操作符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流才开始发射出数据项,它会忽略之前发射过的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void skipUntil(){Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);just1.skipUntil(just2).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}

输出

onNext=2
onNext=3
onNext=4
onNext=5
......

skipWhile操作符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,如果判断值返回true,则不发射该数据项,继续从下一个数据项执行同样的判断,直到某个数据项的判断值返回false时,则终止判断,发射剩余的所有数据项。需要注意的是,这里只要一次判断为false则后面的所有数据项都不判断

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).skipWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=3
onNext=4
onNext=5

7、takeUntil/takeWhile

takeUntil操作符跟skipUntil类似,skip表示跳过的意思,而take表示取值的意思,takeUntil操作符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流停止发射数据项,它会忽略之后的数据项,由于这段代码的的延时操作都是非阻塞型的,所以在Java上运行会导致JVM的立马停止,只能把这段代码放在Android来运行

public void takeUntil(){Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);just1.takeUntil(just2).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}

输出

onNext=0
onNext=1

takeWhile操作符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,如果判断值返回true,则发射该数据项,继续从下一个数据项执行同样的判断,直到某个数据项的判断值返回false时,则终止判断,且剩余的所有数据项不会发射。需要注意的是,这里只要一次判断为false则后面的所有数据项都不判断

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 0).takeWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2

8、小结

  1. all():对所有数据项进行校验
  2. contains():所有数据项是否包含指定数据项
  3. amb():多个事件流中,只发射最先发出的事件流
  4. defaultIfEmpty():如果数据流为空则发射默认数据项
  5. sequenceEqual():判断两个数据流是否完全相等
  6. skipUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时才进行发射
  7. skipWhile():当发射的数据流达到某种条件时,才开始发射剩余所有数据项
  8. takeUntil():当两个事件流发射时,第一个事件流的数据项会等到第二个事件流开始发射时终止发射
  9. takeWhile():当发射的数据流达到某种条件时,才停止发射剩余所有数据项

3.8 Mathematical and Aggregate Operators(数学运算及聚合操作符)

数学运算操作符比较简单,对于数学运算操作符会放在小结中介绍,下面是对聚合操作符做介绍

1、reduce

reduce操作符跟scan操作符是一样的,会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。reduce与scan的唯一区别在于reduce只输出最后的结果,而scan会输出每一次的结果,这点从图片中也能看出来

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).reduce(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer < integer2 ? integer : integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer item) throws Exception {System.out.println("onNext=" + item);}});
}

输出

onNext=1

2、collect

collect操作符跟reduce操作符类似,只不过collect增加了一个可改变数据结构的函数供我们处理

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).collect(new Callable<String>() {@Overridepublic String call() throws Exception {return "A";}}, new BiConsumer<String, Integer>() {@Overridepublic void accept(String s, Integer integer) throws Exception {System.out.println("onNext=" + s + "  " + integer);}}).subscribe(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String s, Throwable throwable) throws Exception {System.out.println("onNext2=" + s);}});
}

输出

onNext=A  8
onNext=A  2
onNext=A  13
onNext=A  1
onNext=A  15
onNext2=A

3、小结

数学运算操作符的使用需要在gradle中添加rxjava-math的依赖

implementation 'io.reactivex:rxjava-math:1.0.0'
  1. average():求所有数据项的平均值
  2. max/min():求所有数据项的最大或最小值
  3. sum():求所有数据项的总和
  4. reduce():对上一轮处理过后的数据流进行函数处理,只返回最后的结果
  5. collect():对上一轮处理过后的数据流进行函数处理,可改变原始的数据结构

3.9 Connectable Observable(连接操作符)

1、publish

publish操作符是将普通的事件流转化成可连接的事件流ConnectableObservable,它与普通的事件流不一样,ConnectableObservable在没有调用connect()进行连接的情况下,事件流是不会发射数据的

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

2、connect

connect操作符是将可连接的事件流进行连接并开始发射数据。这个方法需要注意的是,connect操作符必须在所有事件流被订阅后才开始发射数据。如果放在subscribe之前的话,则订阅者是无法收到数据的。如果后面还有订阅者将订阅此次事件流,则会丢失已经调用了connect后,发射出去的数据项

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});connectableObservable.connect();
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

3、refCount

refCount操作符可以将可连接的事件流转换成普通的事件流

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.refCount().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}

输出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5

4、replay

replay操作符将弥补connect操作符的缺陷,由于connect会让后面进行订阅的订阅者丢失之前发射出去的数据项,所以使用replay操作符可以将发射出去的数据项进行缓存,这样使得后面的订阅者都可以获得完整的数据项。这里需要注意的是,replay操作符不能和publish操作符同时使用,否则将不会发射数据。例子中,读者可以将replay操作符换成publish操作符,这时候的输出就会丢失前2秒发射的数据项

public void replay(){ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).replay();connectableObservable.connect();connectableObservable.delaySubscription(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}

输出

onNext=0
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
......

5、小结

  1. publish():将普通的事件流转换成可连接的事件流
  2. connect():将可连接的事件流进行连接并发射数据
  3. refCount():将可连接的事件流转换成普通的事件流
  4. replay():缓存可连接的事件流中的所有数据项

3章 RxJava操作符相关推荐

  1. RxJava操作符(四)Combining

    RxJava操作符(四)Combining 原文链接 http://blog.chinaunix.net/uid-20771867-id-5197584.html 上一篇文章中我们了解了如何对数据进行 ...

  2. RxJava操作符学习APP

    用于学习RxJava操作符的app 下载地址: fir.im http://fir.im/bpdu 或者直接在 Release里面下载 https://github.com/jiang111/RxJa ...

  3. Rxjava操作符之过滤操作

    前言: 本文将介绍以下过滤类操作符(基于Rxjava2.0): filter ofType take takeLast first firstOrError last lastOrError skip ...

  4. 第十四章 重载操作符与转换

    code: /*第14章 重载操作符与转换14.1 重载操作符的定义 14.2 输入和输出操作符 14.3 算术操作符和关系操作符 14.4 赋值操作符 14.5 下标操作符 14.6 成员访问操作符 ...

  5. Android RxJava操作符的学习---创建操作符

    RxJava如此受欢迎的原因,在于其提供了丰富 & 功能强大的操作符,几乎能完成所有的功能需求 1. 简介 RxJava 操作符的具体简介如下: 2. 类型 RxJava功能强大,所以其对应的 ...

  6. RxJava操作符在android中的使用场景详解(一)

    转载请注明出处:http://www.wangxinarhat.com/2016/04/19/2016-04-19-rxjava-android-operate1/ 最近学习了RxJava在andro ...

  7. RxJava操作符(三)Filtering

    在上一篇文章里,我们了解了转化操作符,能将数据转化为我们想要的格式,但是如果数据集合里面有一些我们想要过滤掉的数据怎么办?这时候我们就需要使用过滤操作符了,有点类似于sql里的where,让Obser ...

  8. RxJava 操作符 do

    看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...

  9. 4章 RxJava基本响应类型

    CSDN学院课程地址 RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10036 RxJava2从入门到精通-中级篇:https://edu. ...

  10. 9章 RxJava混合实战

    本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处 CSDN学院课程地址 RxJava2从入门到精通-初级篇:https://edu.csdn.net/course/detail/10 ...

最新文章

  1. pandas使用cut函数基于分位数进行连续值分箱(手动计算分位数)处理后出现NaN值原因及解决
  2. perl5 第十章 格式化输出
  3. RIP和OSPF双点双向重发布_综合实验
  4. c语言万年历查询程序代码,C语言 万年历程序(示例代码)
  5. php的通用变量,认识并使用PHP的全局变量
  6. 特征金字塔 Feature Pyramid Networks for Object Detection
  7. 【Java】java中 ==,equals,hashcode
  8. 背包问题九讲笔记_01背包
  9. JSP内置对象实例实训报告
  10. 网络通信——下载管理器DownloadManager——利用POST方式上传文件
  11. qq游戏大厅中解析不安装apk的研究
  12. 范宝兴:幻方与类自然数幻方(上)「片桐善直8阶间隔幻方」「同心6阶/8阶/10阶」...
  13. Rainbow portal 研究
  14. overflow清楚浮动的影响
  15. WPT2F06-3/TR通用晶体管PNP 设计放大器应用WILLSEM
  16. 详解微信支付中的异步通知
  17. 一个密码本(ACodebook)使用说明
  18. 怎么汇总多张表格数据
  19. Vue2 必备的50个知识点
  20. 工程物料管理信息化建设(三)——再说材料编码

热门文章

  1. draft.js编辑器开发笔记
  2. HikariCP对各Java版本的支持
  3. 物料分拣系统matlab仿真,基于PLC的物料分拣控制系统设计与仿真(含梯形图)
  4. 银联网关支付,退款java实现
  5. sublime 快捷键
  6. 英诚医院内部网络规划与设计
  7. Java 生成随机数并进行查找
  8. mysql数据库自定义输入法_如何使用Windows10自带输入法添加词库,方便打字
  9. 员工转正述职答辩问什么问题_新员工转正述职答辩PPT
  10. t-SNE原理与推导