简要:

需求了解:

对于 Observable 发射的数据有的时候可能不满足我们的要求,或者需要转化为其他类型的数据,比如:缓存,数据类型转化,数据拦截等。此时可以使用 Rx 中的一些对于数据操作的操作进行数据的变换,方便我们的开发。

执行变换的操作方法:

Buffer:它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个

Map:对序列的每一项都应用一个函数来变换Observable发射的数据序列

FlatMap,FlatMapIterable,ConcatMap:将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平铺化的放进一个单独的 Observable

SwitchMap:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据

Window:定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项

GroupBy:将Observable分拆为Observable集合,将原始Observable发射的数据按 Key

分组,每一个Observable发射一组不同的数据

Scan:对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值

Cast:在发射之前强制将Observable发射的所有数据转换为指定类型

1. Buffer

定期收集Observable的数据放进一个数据包裹(缓存),然后发射这些数据包裹,而不是一次发射一个值。

Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生 的Observable发射这些数据的缓存集合。Buffer操作符在很多语言特定的实现中有很多种变 体,它们在如何缓存这个问题上存在区别。

Window操作符与Buffer类似,但是它在发射之前把收集到的数据放进单独的Observable, 而不是放进一个数据结构。

注意: 如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

在RxJava中的一些Buffer的操作如下:

1.1 buffer(count)

以列表(List)的形式发射非重叠的缓存,每一个缓存至多包含来自原始 Observable 的 count 项数据(最后发射的列表数据可能少于count项)。

实例代码:

// 1. buffer(count)

// 以列表(List)的形式发射非重叠的缓存,

// 每一个缓存至多包含来自原始 Observable的count项数据(最后发射的列表数据可能少于count项)

Observable.range(1, 10)

.buffer(3)

.subscribe(new Consumer>() {

@Override

public void accept(List t) throws Exception {

System.out.println("--> bufferr(1) accept: " + t);

}

});

输出:

--> bufferr(1) accept: [1, 2, 3]

--> bufferr(1) accept: [4, 5, 6]

--> bufferr(1) accept: [7, 8, 9]

--> bufferr(1) accept: [10]

1.2 buffer(boundary)

开始创建一个List 收集原始 Observable 数据,监视一个名叫 boundary 的Observable,每当这个Observable发射了一个值,它就创建一个新的List开始收集来自原始Observable的数据并发射原来已经收集数据的 List, 当 boundary Observable 发送了完成通知,会将此时还未发送的 List 发送。

注意: 所有发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集所有原始 Observable 数据。

实例代码:

// 2. buffer(boundary) 监视一个名叫boundary的Observable,

// 开始创建一个List收集原始 Observable 数据,监视一个名叫boundary的Observable,

// 每当这个Observable发射了一个值,它就创建一个新的List开始收集来自原始Observable的数据并发射原来已经收集数据的List,

// 当boundary发送了完成通知,会将此时还未发送的 List 发送。

// 所有发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集所有原始Observable数据。

Observable.range(1, 10000)

.buffer(Observable.timer(1, TimeUnit.MILLISECONDS)) // 1毫秒后开始接受原始数据

.subscribe(new Consumer>() {

@Override

public void accept(List t) throws Exception {

System.out.println("--> accept(2): " + t.size());// 每次收集的数据序列个数

}

});

输出:

--> accept(2): 2858

--> accept(2): 5471

1.3 buffer(count, skip)

从原始Observable的第一项数据开始创建新的缓存,此后每当收 到 skip 项数据,用 count 项数据填充缓存:开头的一项和后续的count-1项,它以列表 (List)的形式发射缓存,取决于count和skip的值,这些缓存可能会有重叠部分(比如skip< count时),也可能会有间隙(比如skip>count时)。

解析: 在指定的数据序列中移动指针来获取缓存数据:指针每次移动 skip 个数据长度,每次缓存指针位置及后面count个数据,指针初始位置在原始数据的第一个(存在的情况下)。

实例代码:

// 3. buffer(int count, int skip)

// 在指定的数据中移动指针来获取缓存数据:指针每次移动1个数据长度,每次缓存3个数据

Observable.range(1, 5)

.buffer(3, 1)

.subscribe(new Consumer>() {

@Override

public void accept(List t) throws Exception {

System.out.println("--> bufferr(3) accept: " + t);

}

});

输出:

--> bufferr(3) accept: [1, 2, 3]

--> bufferr(3) accept: [2, 3, 4]

--> bufferr(3) accept: [3, 4, 5]

--> bufferr(3) accept: [4, 5]

--> bufferr(3) accept: [5]

1.4 buffer(timespan, TimeUnit)

定期以 List 的形式发射新的数据,在每个时间段,收集来自原始 Observable 的数据(从前面一个数据包裹之后,或者如果是第一个数据包裹,从有观察者订阅原来的 Observale 之后开始)。还有另一个版本的buffer 接受一个 Scheduler 参数。

解析: 每隔 timespan 时间段以 List 的形式收集原始Observable的数据

实例代码:

// 4. buffer(long timespan, TimeUnit unit)

// 每隔timespan时间段以list的形式收集数据

Observable.range(1, 50000)

.buffer(1, TimeUnit.MILLISECONDS)// 每隔1毫秒收集一次原始序列数据

.subscribe(new Consumer>() {

@Override

public void accept(List t) throws Exception {

System.out.println("--> bufferr(4) accept: " + t.size());// 每次收集的数据序列个数

}

});

输出:

--> bufferr(4) accept: 2571

--> bufferr(4) accept: 5457

--> bufferr(4) accept: 13248

--> bufferr(4) accept: 12755

--> bufferr(4) accept: 9543

--> bufferr(4) accept: 6426

注意: buffer(timespan,TimeUnit) 默认情况下会使用 computation 调度器

Javadoc:buffer(timespan,TimeUnit)

Javadoc:buffer(timespan,TimeUnit,Scheduler)

1.5 buffer(timespan, TimeUnit, count)

每当收到来自原始 Observable 的 count 项数据,或者每过了一段指定 timespan 时间后,就以List 的形式发射这期间的数据,即使数据项少于 count 项。还有另一个版本的buffer接受一个Scheduler 参数。

实例代码:

// 5. buffer(long timespan, TimeUnit unit, int count)

// 每隔1毫秒缓存50个数据

Observable.range(1, 1000)

.buffer(1, TimeUnit.MILLISECONDS, 50)// 每隔1毫秒收集50个数据序列

.subscribe(new Consumer>() {

@Override

public void accept(List t) throws Exception {

System.out.println("--> bufferr(5) accept: " + t.size());// 每次收集的数据序列个数

}

});

输出:

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 20

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 4

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 50

--> bufferr(5) accept: 26

注意: buffer(timespan, TimeUnit, count) 默认情况下会使用 computation 调度器

Javadoc: buffer(timespan, TimeUnit, count)

Javadoc: buffer(timespan, TimeUnit, scheduler, count)

1.6 buffer(timespan, timeskip, TimeUnit)

在每一个 timeskip时期内都创建一个新的 List,然后用原始 Observable 发射的每一项数据填充这个列表(在把这个 List 当做自己的数据发射前,从创建时开始,直到过了timespan这么长的时间)。如果 timespan 长于 timeskip ,它发射的数据包将会重叠,因此可能包含重复的数据项。

解析: 在每隔 timeskip 时间段都创建一个新的 List ,每个 List 都独立收集 timespan 时间段原始Observable发射的数据。 因此在 timespan 长于 timeskip 时,它发射的数据包将会重叠,因此不同 List 中可能包含重复的数据项。 还有另一个版本的 buffer接受一个 Scheduler 参数。

实例代码:

// 6. buffer(long timespan, long timeskip, TimeUnit unit)

// 在每一个timeskip时期内都创建一个新的 List,

// 每个List都独立收集timespan时间段原始Observable发射的数据,

// 如果 timespan 长于 timeskip,它发射的数据包将会重叠,因此不同List中可能包含重复的数据项

Observable.range(1, 50000)

.buffer(1, 1, TimeUnit.MILLISECONDS, Schedulers.newThread())

.subscribe(new Consumer>() {

@Override

public void accept(List t) throws Exception {

System.out.println("--> accept(6): " + t.size());// 每次收集的数据序列个数

}

});

输出:

--> accept(6): 1412

--> accept(6): 733

--> accept(6): 10431

--> accept(6): 694

--> accept(6): 18944

--> accept(6): 10710

--> accept(6): 944

--> accept(6): 6132

注意:buffer(imespan, timeskip, TimeUnit) 默认情况下会使用 computation 调度器。

Javadoc: buffer(imespan, timeskip, TimeUnit)

Javadoc: buffer(imespan, timeskip, TimeUnit, schedule)

1.7 buffer(bufferClosingSelector)

当它订阅原来的Observable时,开始将数据收集到一个List,然后它调用 bufferClosingSelector生成第二个Observable,当第二个Observable 发射一个TClosing 时,buffer 发射当前的 List,然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。

注意: 它会一直这样做直到原来的Observable执行完成,可以收集完整的原始 Observable 的数据

实例代码:

// 7. buffer(Callable> boundarySupplier)

// 当它订阅原来的Observable时,开始将数据收集到一个List,然后它调用 bufferClosingSelector 生成第二个Observable,

// 当第二个Observable 发射一个 TClosing 时,buffer 发射当前的 List ,

// 然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。

// 它会一直这样做直到原来的Observable执行完成。会收集完整的原始 Observable 的数据

Observable.range(1, 50000)

.buffer(new Callable>() {

@Override

public Observable call() throws Exception {

return Observable.timer(1, TimeUnit.MILLISECONDS);

}

}).subscribe(new Consumer>() {

@Override

public void accept(List t) throws Exception {

System.out.println("--> accept(7): " + t.size());// 每次收集的数据序列个数

}

});

输出:

--> accept(7): 14650

--> accept(7): 9708

--> accept(7): 25642

2. Map

对Observable发射的每一项数据应用一个函数,执行变换操作。

实例代码:

// map(Function

// 接受原始Observable的数据,发送处理后的数据

Observable.range(1, 5)

.map(new Function() {

@Override

public Integer apply(Integer t) throws Exception {

System.out.println("--> apply: " + t);

return t*t;// 计算原始数据的平方

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept Map: " + t);

}

});

输出:

--> apply: 1

--> accept Map: 1

--> apply: 2

--> accept Map: 4

--> apply: 3

--> accept Map: 9

--> apply: 4

--> accept Map: 16

--> apply: 5

--> accept Map: 25

3. FlatMap

主要对原始数据进行转换操作后发送至订阅者。

RxJava2 中的一些 FlatMap 操作方法如下:

3.1 flatMap(mapper)

FlatMap 将一个发射数据的 Observable 变换为 多个 Observables,然后将它们发射的数据合并后放进一个单独的 Observable。

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后 FlatMap 合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的 Observable发射这些次级Observable发射的数据的完整集合。

注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。

在许多语言特定的实现中,还有一个操作符不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据,这个操作符通常被叫作ConcatMap或者类似的名字。

实例代码:

// 1. flatMap(Function)

// 对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,

// 然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射

Observable.range(1, 5)

.flatMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(1): " + t);// 原始数据

return Observable.range(1, t).subscribeOn(Schedulers.newThread());// 处理后数据

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept flatMap(1): " + t);// 接受的所有数据

}

});

输出:

--> apply(1): 1

--> apply(1): 2

--> apply(1): 3

--> apply(1): 4

--> accept flatMap(1): 1

--> accept flatMap(1): 2

--> apply(1): 5

--> accept flatMap(1): 1

--> accept flatMap(1): 1

--> accept flatMap(1): 2

--> accept flatMap(1): 3

--> accept flatMap(1): 4

--> accept flatMap(1): 1

--> accept flatMap(1): 2

--> accept flatMap(1): 3

--> accept flatMap(1): 4

--> accept flatMap(1): 5

--> accept flatMap(1): 1

--> accept flatMap(1): 2

--> accept flatMap(1): 3

3.2 flatMap(mapper, maxConcurrency)

maxConcurrency 这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。当达到这个限制时,它会等待其中一个终止然后再订阅另一个。

实例代码:

// 2. flatMap(Function, maxConcurrency)

// maxConcurrency 这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。

// 当达到这个限制时,它会等待其中一个终止然后再订阅另一个

Observable.range(1, 5)

.flatMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(2): " + t);

return Observable.range(1, t).subscribeOn(Schedulers.newThread());

}

// 指定最大订阅数为1,此时等待上一个订阅的Observable结束,在进行下一个Observable订阅

}, 1).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept flatMap(2): "+ t);

}

});

输出:

--> apply(2): 1

--> apply(2): 2

--> apply(2): 3

--> apply(2): 4

--> apply(2): 5

--> accept flatMap(2): 1

--> accept flatMap(2): 1

--> accept flatMap(2): 2

--> accept flatMap(2): 1

--> accept flatMap(2): 2

--> accept flatMap(2): 3

--> accept flatMap(2): 1

--> accept flatMap(2): 2

--> accept flatMap(2): 3

--> accept flatMap(2): 4

--> accept flatMap(2): 1

--> accept flatMap(2): 2

--> accept flatMap(2): 3

--> accept flatMap(2): 4

--> accept flatMap(2): 5

3.3 flatMap(mapper, delayErrors)

delayError 这个参数指定是否延迟发生 Error 的Observable通知。还有一个可以指定最大订阅数参数 maxConcurrency 的变体。

解析: 当值为 true 时延迟发生Error的这个订阅的Observable通知,不中断当前的订阅操作,继续下一个Observable的订阅,在所有订阅的Observable全部结束后发送 Error 这个Observable的通知,当值为 false 时则中断所有订阅的操作,并发送 Error 的通知。

实例代码:

// 3. flatMap(Function, delayErrors)

// delayErrors 这个参数指定是否延迟发生Error的Observable通知

// 当true 时延迟发生Error的这个订阅的Observable通知,不中断当前的订阅操作,

// 继续下一个Observable的订阅,在所有订阅的Observable全部结束后发送Error这个Observable的通知

Observable.range(1, 5)

.flatMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(3): " + t);

return Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

if( t == 3) {

throw new NullPointerException("delayErrors test!");// 测试 Error

}

for (int i = 1; i <= t; i++) {

emitter.onNext(i);

}

emitter.onComplete();

}

});

}

// 设置延迟 Error 通知到最后

}, true).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept flatMap(3): "+ t);

}

},new Consumer() {

@Override

public void accept(Throwable t) throws Exception {

System.out.println("--> acceot Error(3): " + t);

}

});

输出:

--> apply(3): 1

--> accept flatMap(3): 1

--> apply(3): 2

--> accept flatMap(3): 1

--> accept flatMap(3): 2

--> apply(3): 3

--> apply(3): 4

--> accept flatMap(3): 1

--> accept flatMap(3): 2

--> accept flatMap(3): 3

--> accept flatMap(3): 4

--> apply(3): 5

--> accept flatMap(3): 1

--> accept flatMap(3): 2

--> accept flatMap(3): 3

--> accept flatMap(3): 4

--> accept flatMap(3): 5

--> acceot Error(3): java.lang.NullPointerException: delayErrors test!

3.4 flatMapIterable(mapper)

flatMapIterable这个变体成对的打包数据,然后生成 Iterable 而不是原始数据和生成的 Observables,但是处理方式是相同的。

解析: 对数据进行处理转换成 Iterable 来发射数据。

实例代码:

//4. flatMapIterable(Function(T,R))

// 对数据进行处理转换成Iterable来发射数据

Observable.range(1, 5)

.flatMapIterable(new Function>() {

@Override

public Iterable extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply: " + t);

ArrayList list = new ArrayList();

list.add(888);

list.add(999);

return list; // 将原始数据转换为两个数字发送

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept flatMapIterable(4): " + t);

}

});

输出:

--> apply: 1

--> accept flatMapIterable(4): 888

--> accept flatMapIterable(4): 999

--> apply: 2

--> accept flatMapIterable(4): 888

--> accept flatMapIterable(4): 999

--> apply: 3

--> accept flatMapIterable(4): 888

--> accept flatMapIterable(4): 999

--> apply: 4

--> accept flatMapIterable(4): 888

--> accept flatMapIterable(4): 999

--> apply: 5

--> accept flatMapIterable(4): 888

--> accept flatMapIterable(4): 999

3.5 flatMapIterable(mapper, resultSelector)

参数 mapper 接收原始数据,resultSelector 同时接收原始数据和 mapper 处理的数据,进行二次数据转换。

实例代码:

//5. flatMapIterable(Function(T,R),Function(T,T,R))

// 第一个func接受原始数据,转换数据,第二个func同时接受原始和处理的数据,进行二次转换处理

Observable.range(1, 3)

.flatMapIterable(new Function>() {

@Override

public Iterable extends Integer> apply(Integer t) throws Exception {

ArrayList list = new ArrayList();

list.add(888);

list.add(999);

return list; // 将原始数据转换为两个数字发送

}

}, new BiFunction() {

@Override

public Integer apply(Integer t1, Integer t2) throws Exception {

System.out.println("--> apply(5): t1 = " + t1 + ", t2 = " + t2);

return t1 + t2;// 将原始数据和处理过的数据组合进行二次处理发送

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept flatMapIterable(5): " + t);

}

});

输出:

--> apply(5): t1 = 1, t2 = 888

--> accept flatMapIterable(5): 889

--> apply(5): t1 = 1, t2 = 999

--> accept flatMapIterable(5): 1000

--> apply(5): t1 = 2, t2 = 888

--> accept flatMapIterable(5): 890

--> apply(5): t1 = 2, t2 = 999

--> accept flatMapIterable(5): 1001

--> apply(5): t1 = 3, t2 = 888

--> accept flatMapIterable(5): 891

--> apply(5): t1 = 3, t2 = 999

--> accept flatMapIterable(5): 1002

4. ConcatMap

concatMap 操作符的功能和 flatMap 是非常相似的,只是有一点,concatMap 最终输出的数据序列和原数据序列是一致,它是按顺序链接Observables,而不是合并(flatMap用的是合并)。

通过 mapper 处理原数据后,转换成 Observables ,按照顺序进行连接 Observables 发送数据。

解析: concatMap和flatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap采用的merge,而concatMap采用的是连接(concat)。区别:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。

实例代码:

// 1. concatMap(Function(T,R))

// 按照顺序依次处理原始数据和处理的数据

Observable.range(1, 3)

.concatMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(1): " + t);

return Observable.range(1, t).doOnSubscribe(new Consumer() {

@Override

public void accept(Disposable t) throws Exception {

System.out.println("--> accept(1): Observable on Subscribe");// 当前的Observable被订阅

}

});

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept concatMap(1): " + t);

}

});

System.out.println("--------------------------------------------");

// 2. concatMap(mapper, prefetch)

// prefetch 参数是在处理后的Observables发射的数据流中预读数据个数,不影响原数据的发射和接收顺序

Observable.range(1, 3)

.concatMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(2): " + t);

return Observable.range(1, 3).doOnSubscribe(new Consumer() {

@Override

public void accept(Disposable t) throws Exception {

System.out.println("--> accept(2): Observable on Subscribe");// 当前的Observable被订阅

}

});

}

}, 2).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept concatMap(2): " + t);

}

});

输出:

--> apply(1): 1

--> accept(1): Observable on Subscribe

--> accept concatMap(1): 1

--> apply(1): 2

--> accept(1): Observable on Subscribe

--> accept concatMap(1): 1

--> accept concatMap(1): 2

--> apply(1): 3

--> accept(1): Observable on Subscribe

--> accept concatMap(1): 1

--> accept concatMap(1): 2

--> accept concatMap(1): 3

--------------------------------------------

--> apply(2): 1

--> accept(2): Observable on Subscribe

--> accept concatMap(2): 1

--> accept concatMap(2): 2

--> accept concatMap(2): 3

--> apply(2): 2

--> accept(2): Observable on Subscribe

--> accept concatMap(2): 1

--> accept concatMap(2): 2

--> accept concatMap(2): 3

--> apply(2): 3

--> accept(2): Observable on Subscribe

--> accept concatMap(2): 1

--> accept concatMap(2): 2

--> accept concatMap(2): 3

5. SwitchMap

有选择的订阅 Observable,当原始 Observable 发射一个数据,通过 witchMap 返回一个 Observable,

当原始Observable发射一个新的数据时,它将取消订阅并停止监视产生执之前的Observable,开始监视当前新的Observable。

解析: 如果上一个任务尚未完成时,就开始下一个任务的话,上一个任务就会被取消掉。如果所有任务都是在同一个线程里执行的话,此时这个操作符与 ContactMap 一致,都是依次顺序执行。只有在不同的线程里执行的时候,即线程方案为newThread的时候,才会出现这种情况,常用于网络请求中。

实例代码:

// 1. witchMap(Function(T,R))

// 同一个线程执行

Observable.range(1, 3)

.switchMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(1): " + t);

return Observable.range(1, 3);// 每个任务指定在同一个线程执行

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept switchMap(1): " + t);

}

});

System.out.println("---------------------------------------");

// 2. witchMap(Function(T,R))

// 不同线程执行

Observable.range(1, 3)

.switchMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(2): " + t);

return Observable.range(1, 3)

.subscribeOn(Schedulers.newThread());// 每个任务指定在子线程执行

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept switchMap(2): " + t);

}

});

System.out.println("---------------------------------------");

// 3. switchMap(mapper, bufferSize)

// bufferSize 参数是从当前活动的Observable中预读数据的大小

Observable.range(1, 3)

.switchMap(new Function>() {

@Override

public ObservableSource extends Integer> apply(Integer t) throws Exception {

System.out.println("--> apply(3): " + t);

return Observable.range(1, 5).subscribeOn(Schedulers.newThread());

}

}, 3).subscribe(new Consumer() {// 指定缓存大小为3

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept switchMap(3): " + t);

}

});

输出:

--> apply(1): 1

--> accept switchMap(1): 1

--> accept switchMap(1): 2

--> accept switchMap(1): 3

--> apply(1): 2

--> accept switchMap(1): 1

--> accept switchMap(1): 2

--> accept switchMap(1): 3

--> apply(1): 3

--> accept switchMap(1): 1

--> accept switchMap(1): 2

--> accept switchMap(1): 3

---------------------------------------

--> apply(2): 1

--> apply(2): 2

--> apply(2): 3

--> accept switchMap(2): 1

--> accept switchMap(2): 2

--> accept switchMap(2): 3

---------------------------------------

--> apply(3): 1

--> apply(3): 2

--> apply(3): 3

--> accept switchMap(3): 1

--> accept switchMap(3): 2

--> accept switchMap(3): 3

--> accept switchMap(3): 4

--> accept switchMap(3): 5

接续:

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

java rx.observable_Rxjava2 Observable的数据变换详解及实例(一)相关推荐

  1. java rx.observable_Rxjava2 Observable的数据变换详解及实例(二)

    1. Window 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据. Window 和 Buffer 类似,但不是发射来自原始Obse ...

  2. java rx.observable_Rxjava2 Observable的条件操作符详解及实例

    简要: 需求了解: 在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据.跳过指定条件的 Observable 等一系列的条件操作需求,那么很幸运 ...

  3. java rx.observable_Rxjava2 Observable的错误处理操作详解及实例

    简要: 需求了解: Rxjava 中当数据处理派发中发生了异常 ,观察者会接受到一个 Error 的通知,那如果不想发射这个异常的通知,自己处理掉呢?答案当然是可以的,在 Rxjava 中很多操作符可 ...

  4. Java经典算法四十例编程详解+程序实例

    JAVA经典算法40例 [程序1] 题目:古典问题:有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到第四个月后每个月又生一对兔子,假如兔子都不死,问每个月的兔子总数为多少? 1.程序分析: ...

  5. java lock代码写法_java Lock接口详解及实例代码

    java  lock接口 java.util.concurrent.locks 接口lock public interface loce loce实现提供了比使用synchronized方法和语句可获 ...

  6. java程序日期转换_Java 日期转换详解及实例代码

    Java 日期转换 涉及的核心类:Date类.SimpleDateFormat类.Calendar类 一. Date型与long型 Date型转换为long型 Date date = new Date ...

  7. java builder pattern_Java Builder Pattern建造者模式详解及实例

    Java Builder Pattern 1.概念 将一个复杂的构建与其表示相分离,使得同样的构建过程可以创建不同的表示. [构建与表示分离,同构建不同表示] 与抽象工厂的区别:在建造者模式里,有个指 ...

  8. 2020年 第11届 蓝桥杯 Java B组 省赛真题详解及小结【第1场省赛 2020.7.5】

    蓝桥杯 Java B组 省赛决赛 真题详解及小结汇总[2013年(第4届)~2021年(第12届)] 第11届 蓝桥杯-第1.2次模拟(软件类)真题-(2020年3月.4月)-官方讲解视频 说明:部分 ...

  9. 2020年 第11届 蓝桥杯 Java C组 省赛真题详解及小结【第1场省赛 2020.7.5】

    蓝桥杯 Java B组 省赛真题详解及小结汇总[2013年(第4届)~2020年(第11届)] 注意:部分代码及程序 源自 蓝桥杯 官网视频(历年真题解析) 郑未老师. 2013年 第04届 蓝桥杯 ...

最新文章

  1. Python 【第八章】:JavaScript 、Dom、jQuery
  2. char转成string_真没想到,一个小小的String居然还有这么多窍门?
  3. Jupyter notebook 导入和卸载 conda 虚拟环境
  4. 如何通过Graph+AI的方法打造高精度风控模型
  5. 更换分布式文件系统副本组成员的硬件或操作系统——第一步:计划
  6. word vba 点击任意域代码,刷新整个文档的域代码值
  7. 为什么每个程序员都应该学习C语言?
  8. 7-9 找出最小值 (20 分)
  9. Linux zmap安装
  10. wordpress插件_哪个是最好的WordPress画廊插件? (性能比较)
  11. 基于Qt的智能管家客户端设计
  12. ajax的get json数据格式,jQuery / 用getJSON()方法加载JSON格式数据 - 汇智网
  13. 众多交通工具3dm Rhino资源素材一键即可获取
  14. 100G QSFP28 LR4 10km单模光模块特征
  15. 大学计算机基础知识判断题,大学计算机基础知识考试试题及答案
  16. Webpack安装与配置
  17. C Programming学习笔记【谭浩强老师编】(第四章选择结构程序设计)02 逻辑运算符和逻辑表达式
  18. Python 绘制遥感数字高程影像(DEM)
  19. CCC3.0学习笔记_认证和隐私保护
  20. 第一台计算机如何工作原理,世界上第一台计算机是什么原理_世界上第一台计算机...

热门文章

  1. 【作业】研一(互联网新技术作业)
  2. Sox 萨班斯法案--摘录
  3. Cocos Creator 打包WebMobile,实现资源代码分离,部署 cdn
  4. 秀米 ueditor 远程抓图到本地 select 图片丢失
  5. Unity3D游戏开发之GUI的使用
  6. android 获取nfc,Android:在服务类中读取NFC标签
  7. Java 日志框架 JUL
  8. 【云计算】制造业云计算应用趋势分析!
  9. 对当前计算机应用的理解论文,论当前计算机发展模式及改革计算机应用论文发表...
  10. SAP GUI快捷键