1. Window

定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据。

Window 和 Buffer 类似,但不是发射来自原始Observable的数据包,它发射的是 Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发 射一个onCompleted通知。

和 Buffer一样,Window 有很多变体,每一种都以自己的方式将原始Observable分解为多个作为结果的Observable,每一个都包含一个映射原始数据的 window 。用Window操作符的术语描述就是,当一个窗口打开(whenawindow "opens")意味着一个新的Observable已经发射 (产生)了,而且这个Observable开始发射来自原始Observable的数据;当一个窗口关闭 (whenawindow "closes")意味着发射(产生)的Observable停止发射原始Observable的数据, 并且发射终止通知onCompleted给它的观察者们。

在RxJava中有许多种Window操作符的方法。

1.1 window(closingSelector)

window 的这个方法会立即打开它的第一个窗口。每当它观察到closingSelector返回的 Observable发射了一个对象时,它就关闭当前打开的窗口并立即打开一个新窗口。用这个方法,这种window变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。

解析: 一开始开启一个 window 接收原始数据,每当它观察到closingSelector返回的 Observable发射了一个对象时,它就关闭当前打开的窗口并取消此时订阅closingSelector 的Observable ( 此时可能是没有数据 window )并立即打开一个新窗口,注意: 每个窗口开启前都会去订阅一个closingSelector返回的 Observable。

实例代码:

// 1. window(Callable boundary)

// 开启一个window,并订阅观察boundary返回的Observable发射了一个数据,

// 则关闭此window,将收集的数据以Observable发送, 重新订阅boundary返回的Observable,开启新window

Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)

.window(new Callable>() {

@Override

public Observable call() throws Exception {

System.out.println("--> call(1)");

return Observable.timer(2, TimeUnit.SECONDS); // 两秒后关闭当前窗口

}

}).subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

// 接受每个window接受的数据的Observable

t.subscribe(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> accept(1): " + t);

}

});

}

});

输出:

--> call(1)

--> accept(1): 1

--> accept(1): 2

--> accept(1): 3

--> call(1)

--> accept(1): 4

--> accept(1): 5

--> call(1)

--> accept(1): 6

--> accept(1): 7

--> call(1)

--> accept(1): 8

--> accept(1): 9

--> call(1)

--> accept(1): 10

1.2 window(openingIndicator, closingIndicator)

当 openingIndicator 发射一个数据,就会打开一个 window, 同时订阅 closingIndicator 返回的Observable,当这个Observable发射一个数据,就结束此 window 和 ,发送收集数据的 Observable。

无论何时,只要window观察到windowOpenings这个Observable发射了一个 Opening对象,它就打开一个窗口,并且同时调用 closingSelector 生成一个与那个窗口关联的关闭 (closing)Observable 。当这个关闭 (closing)Observable 发射了一个对象时,window 操作符就会关闭那个窗口以及关联的closingSelector的 Observable。

注意: 对这个方法来说,由于当前窗口的关闭和新窗口的打开是由单独的 Observable 管理的,它创建的窗口可能会存在重叠(重复某些来自原始Observable的数据) 或间隙(丢弃某些来自原始Observable的数据)。

实例代码:

// 2. window(ObservableSource openingIndicator, Function> closingIndicator)

// 当openingIndicator发射一个数据,就会打开一个window, 同时订阅closingIndicator返回的Observable,

// 当这个Observable发射一个数据,就结束此window以及对应的closingIndicator,发送收集数据的 Observable。

Observable openingIndicator = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)

.doOnSubscribe(new Consumer() {

@Override

public void accept(Disposable t) throws Exception {

System.out.println("--> openingIndicator is subscribe!");

}

}).doOnComplete(new Action() {

@Override

public void run() throws Exception {

System.out.println("--> openingIndicator is completed!");

}

}).doOnNext(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> openingIndicator emitter: " + t);

}

});

Observable dataSource = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)

.doOnSubscribe(new Consumer() {

@Override

public void accept(Disposable t) throws Exception {

System.out.println("--> DataSource is subscribe!");

}

}).doOnNext(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> DataSource emitter: " + t);

}

});

dataSource.window(openingIndicator, new Function>() {

@Override

public Observable apply(Long t) throws Exception {

System.out.println("--> apply(2): " + t);

return Observable.timer(2, TimeUnit.SECONDS).doOnSubscribe(new Consumer() {

@Override

public void accept(Disposable t) throws Exception {

System.out.println("--> closingIndicator is subscribe!");

}

});

}

}).subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

System.out.println("-------------------> new window data");

t.subscribe(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> accept(2): " + t);

}

});

}

});

输出:

--> DataSource is subscribe!

--> openingIndicator is subscribe!

--> openingIndicator emitter: 1

--> DataSource emitter: 1

-------------------> new window data

--> apply(2): 1

--> closingIndicator is subscribe!

--> openingIndicator emitter: 2

--> DataSource emitter: 2

-------------------> new window data

--> apply(2): 2

--> closingIndicator is subscribe!

--> accept(2): 2

--> accept(2): 2

--> openingIndicator emitter: 3

--> DataSource emitter: 3

-------------------> new window data

--> apply(2): 3

--> closingIndicator is subscribe!

--> accept(2): 3

--> accept(2): 3

--> accept(2): 3

--> DataSource emitter: 4

--> openingIndicator emitter: 4

--> accept(2): 4

--> accept(2): 4

-------------------> new window data

--> apply(2): 4

--> closingIndicator is subscribe!

--> DataSource emitter: 5

--> accept(2): 5

--> accept(2): 5

--> openingIndicator emitter: 5

1.3 window(count)

这个 window 的方法立即打开它的第一个窗口。每当当前窗口发射了 count 项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了 onError 或onCompleted 通知它也会关闭当前窗口。

这种window 方法发射一系列不重叠的窗口,这些窗口的数据集合与原始 Observable发射的数据是 一一对应 的。

实例代码:

// 3. window(count)

// 以count为缓存大小收集的不重叠的Observables对象,接受的数据与原数据彼此对应

Observable.range(1, 20)

.window(5)// 设置缓存大小为5

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

System.out.println("--------------> new data window");

t.subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept window(3): " + t);

}

});

}

});

输出:

--------------> new data window

--> accept window(3): 1

--> accept window(3): 2

--> accept window(3): 3

--> accept window(3): 4

--> accept window(3): 5

--------------> new data window

--> accept window(3): 6

--> accept window(3): 7

--> accept window(3): 8

--> accept window(3): 9

--> accept window(3): 10

--------------> new data window

--> accept window(3): 11

--> accept window(3): 12

--> accept window(3): 13

--> accept window(3): 14

--> accept window(3): 15

--------------> new data window

--> accept window(3): 16

--> accept window(3): 17

--> accept window(3): 18

--> accept window(3): 19

--> accept window(3): 20

1.4 window(count,skip)

这个window 的方法立即打开它的第一个窗口。原始Observable每发射 skip 项数据它就打开 一个新窗口(例如,如果skip等于3,每到第三项数据,它会创建一个新窗口)。每当当前窗口发射了 count 项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable 收到了onError或 onCompleted 通知它也会关闭当前窗口。

解析: window 一开始打开一个 window,每发射 skip 项数据就会打开一个 window 独立收集 原始数据,当 window 收集了 count 个数据就会关闭,开启另外一个。当原始Observable发送了onError或者onCompleted通知也会关闭当前窗口。

skip = count: 会依次顺序接受原始数据,同window(count)

skip > count: 两个窗口可能会有 skip-count 项数据丢失

skip < count: 两个窗口可能会有 count-skip 项数据重叠

实例代码:

// 4. window(count,skip)

// window一开始打开一个window,每发射skip项数据就会打开一个window独立收集原始数据

// 当window收集了count个数据就会关闭window,开启另外一个。

// 当原始Observable发送了onError 或者 onCompleted 通知也会关闭当前窗口。

// 4.1 skip = count: 会依次顺序接受原始数据,同window(count)

Observable.range(1, 10)

.window(2, 2)// skip = count, 数据会依次顺序输出

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept window(4-1): " + t +" , ThreadID: "+ Thread.currentThread().getId());

}

});

}

});

// 4.2 skip > count: 两个窗口可能会有 skip-count 项数据丢失

Observable.range(1, 10)

.window(2, 3)// skip > count, 数据会存在丢失

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept window(4-2): " + t +" , ThreadID: "+ Thread.currentThread().getId());

}

});

}

});

// 4.3 skip < count: 两个窗口可能会有 count-skip 项数据重叠

Observable.range(1, 10)

.window(3, 2)// skip < count, 数据会重叠

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept window(4-3): " + t +" , ThreadID: "+ Thread.currentThread().getId());

}

});

}

});

输出:

--> accept window(4-1): 1 , ThreadID: 11

--> accept window(4-1): 2 , ThreadID: 11

--> accept window(4-1): 4 , ThreadID: 12

--> accept window(4-1): 3 , ThreadID: 11

--> accept window(4-1): 5 , ThreadID: 12

--> accept window(4-1): 6 , ThreadID: 12

--> accept window(4-1): 7 , ThreadID: 13

--> accept window(4-1): 8 , ThreadID: 13

--> accept window(4-1): 9 , ThreadID: 13

--> accept window(4-1): 10 , ThreadID: 14

--> accept window(4-2): 1 , ThreadID: 15

--> accept window(4-2): 2 , ThreadID: 15

--> accept window(4-2): 4 , ThreadID: 16

--> accept window(4-2): 5 , ThreadID: 16

--> accept window(4-2): 7 , ThreadID: 17

--> accept window(4-2): 8 , ThreadID: 17

--> accept window(4-2): 10 , ThreadID: 18

--> accept window(4-3): 1 , ThreadID: 19

--> accept window(4-3): 2 , ThreadID: 19

--> accept window(4-3): 3 , ThreadID: 19

--> accept window(4-3): 3 , ThreadID: 20

--> accept window(4-3): 4 , ThreadID: 20

--> accept window(4-3): 5 , ThreadID: 20

--> accept window(4-3): 5 , ThreadID: 21

--> accept window(4-3): 6 , ThreadID: 21

--> accept window(4-3): 7 , ThreadID: 21

--> accept window(4-3): 7 , ThreadID: 22

--> accept window(4-3): 8 , ThreadID: 22

--> accept window(4-3): 9 , ThreadID: 22

--> accept window(4-3): 9 , ThreadID: 23

--> accept window(4-3): 10 , ThreadID: 23

1.5 window(timespan, TimeUnit)

这个window 的方法立即打开它的第一个窗口收集数据。每当过了 timespan 这么长的时间段它就关闭当前窗口并打开一个新窗口(时间单位是 unit ,可选在调度器 scheduler 上执行)收集数据。如果从原始 Observable 收到了 onError或 onCompleted通知它也会关闭当前窗口。

这种window 方法发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是 一一对应 的。

实例代码:

// 5. window(long timespan, TimeUnit unit)

// 每当过了 timespan 的时间段,它就关闭当前窗口并打开另一个新window收集数据

Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)

.window(2, TimeUnit.SECONDS)// 间隔2秒关闭当前 window 并打开一个新 window 收集数据

//.window(2, TimeUnit.SECONDS, Schedulers.newThread())// 指定在 newThread 线程中

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> accept window(5): " + t +" , ThreadID: "+ Thread.currentThread().getId() );

}

});

}

});

输出:

--> accept window(5): 1 , ThreadID: 11

--> accept window(5): 2 , ThreadID: 11

--> accept window(5): 3 , ThreadID: 11

--> accept window(5): 4 , ThreadID: 14

--> accept window(5): 5 , ThreadID: 14

--> accept window(5): 6 , ThreadID: 15

--> accept window(5): 7 , ThreadID: 16

--> accept window(5): 8 , ThreadID: 16

--> accept window(5): 9 , ThreadID: 17

--> accept window(5): 10 , ThreadID: 17

1.6 window(timespan, TimeUnit, count)

这个 window 的方法立即打开它的第一个窗口。这个变体是 window(count) 和 window(timespan,unit[,scheduler]) 的结合,每当过了 timespan 的时长或者当前窗口收到了 count 项数据,它就关闭当前窗口并打开另一个。如果从原始 Observable收到了 onError 或 onCompleted 通知它也会关闭当前窗口。

这种window方法发射 一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是 一一对应 的。

实例代码:

// 6. window(long timespan, TimeUnit unit, long count)

// 每当过了timespan的时间段或者当前窗口收到了count项数据,它就关闭当前window并打开另一个window收集数据

Observable.intervalRange(1, 12, 0, 500, TimeUnit.MILLISECONDS)

.window(2, TimeUnit.SECONDS, 5)// 每隔2秒关闭当前收集数据的window并开启一个window收集5项数据

//.window(2, TimeUnit.SECONDS,Schedulers.newThread(), 5 )// 指定在 newThread 线程中

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> accept window(6): " + t + " , ThreadID: "+ Thread.currentThread().getId() );

}

});

}

});

输出:

--> accept window(6): 1 , ThreadID: 11

--> accept window(6): 2 , ThreadID: 11

--> accept window(6): 3 , ThreadID: 11

--> accept window(6): 4 , ThreadID: 11

--> accept window(6): 5 , ThreadID: 11

--> accept window(6): 6 , ThreadID: 14

--> accept window(6): 7 , ThreadID: 14

--> accept window(6): 8 , ThreadID: 14

--> accept window(6): 9 , ThreadID: 14

--> accept window(6): 10 , ThreadID: 14

--> accept window(6): 11 , ThreadID: 15

--> accept window(6): 12 , ThreadID: 15

1.7 window(timespan, timeskip, TimeUnit)

这个 window 的方法立即打开它的第一个窗口。随后每当过了timeskip 的时长就打开一个新窗口(时间单位是 unit,可选在调度器 scheduler 上执行),当窗口打开的时长达 到 timespan ,它就关闭当前打开的窗口。如果从原始Observable收到 了 onError 或 onCompleted 通知它也会关闭当前窗口。窗口的数据可能重叠也可能有间隙,取决于你设置的 timeskip 和 timespan 的值。

解析: 在每一个 timeskip 时期内都创建一个新的 window,然后独立收集 timespan 时间段的原始Observable发射的每一项数据。注意:因为每个 window 都是独立接收数据,当接收数据的时间与创建新 window 的时间不一致时会有数据项重复,丢失等情况。

skip = timespan: 会依次顺序接受原始数据,同window(count)

skip > timespan: 两个窗口可能会有 skip-timespan 项数据丢失

skip < timespan: 两个窗口可能会有 timespan-skip 项数据重叠

实例代码:

// 7. window(long timespan, long timeskip, TimeUnit unit)

// 在每一个timeskip时期内都创建一个新的window,然后独立收集timespan时间段的原始Observable发射的每一项数据,

// 如果timespan长于timeskip,它发射的数据包将会重叠,因此可能包含重复的数据项。

// 7.1 skip = timespan: 会依次顺序接受原始数据,同window(count)

Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)

.window(1, 1, TimeUnit.SECONDS)// 设置每秒创建一个window,收集2秒的数据

//.window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())// 指定在 newThread 线程中

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> accept window(7-1): " + t + " , ThreadID: "+ Thread.currentThread().getId());

}

});

}

});

// 7.2 skip > timespan: 两个窗口可能会有 skip-timespan 项数据丢失

Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)

.window(1, 2, TimeUnit.SECONDS)// 设置每秒创建一个window,收集2秒的数据

//.window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())// 指定在 newThread 线程中

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> accept window(7-2): " + t + " , ThreadID: "+ Thread.currentThread().getId());

}

});

}

});

// 7.3 skip < timespan: 两个窗口可能会有 timespan-skip 项数据重叠

Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)

.window(2, 1, TimeUnit.SECONDS)// 设置每秒创建一个window,收集2秒的数据

//.window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())// 指定在 newThread 线程中

.subscribe(new Consumer>() {

@Override

public void accept(Observable t) throws Exception {

t.observeOn(Schedulers.newThread())

.subscribe(new Consumer() {

@Override

public void accept(Long t) throws Exception {

System.out.println("--> accept window(7-3): " + t + " , ThreadID: "+ Thread.currentThread().getId());

}

});

}

});

输出:

--> accept window(7-1): 1 , ThreadID: 11

--> accept window(7-1): 2 , ThreadID: 11

--> accept window(7-1): 3 , ThreadID: 14

--> accept window(7-1): 4 , ThreadID: 15

--> accept window(7-1): 5 , ThreadID: 17

----------------------------------------------------------------------

--> accept window(7-2): 1 , ThreadID: 11

--> accept window(7-2): 3 , ThreadID: 14

--> accept window(7-2): 5 , ThreadID: 15

----------------------------------------------------------------------

--> accept window(7-3): 1 , ThreadID: 11

--> accept window(7-3): 2 , ThreadID: 11

--> accept window(7-3): 2 , ThreadID: 14

--> accept window(7-3): 3 , ThreadID: 14

--> accept window(7-3): 3 , ThreadID: 15

--> accept window(7-3): 4 , ThreadID: 15

--> accept window(7-3): 4 , ThreadID: 16

--> accept window(7-3): 5 , ThreadID: 16

--> accept window(7-3): 5 , ThreadID: 17

2. GroupBy

将一个 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始 Observable 的一个子序列。

RxJava实现了 groupBy 操作符。它返回Observable的一个特殊子类 GroupedObservable ,实现了GroupedObservable 接口的对象有一个额外的方法getKey,这个 Key 用于将数据分组到指定的Observable。有一个版本的 groupBy 允许你传递一个变换函数,这样它可以在发射结果 GroupedObservable 之前改变数据项。

如果你取消订阅一个 GroupedObservable ,那个 Observable 将会终止。如果之后原始的 Observable又发射了一个与这个Observable的Key匹配的数据,groupBy将会为这个 Key 创建一个新的 GroupedObservable。

注意: groupBy将原始 Observable 分解为一个发射多个 GroupedObservable的Observable,一旦有订阅,每个 GroupedObservable 就开始缓存数据。因此,如果你忽略这 些GroupedObservable中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable 。你应该使用像take(0) 这样会丢弃自己的缓存的操作符。

2.1 groupBy(keySelector)

GroupBy 操作符将原始 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始 Observable 数据序列的一个子序列。哪个数据项由哪一个 Observable 发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个 Observable 发射。还有一个 delayError 参数的方法,指定是否延迟 Error 通知的Observable。

实例代码:

// 1. groupBy(keySelector)

// 将原始数据处理后加上分组tag,通过GroupedObservable发射分组数据

Observable.range(1, 10)

.groupBy(new Function() {

@Override

public String apply(Integer t) throws Exception {

// 不同的key将会产生不同分组的Observable

return t % 2 == 0 ? "Even" : "Odd"; // 将数据奇偶数进行分组,

}

}).observeOn(Schedulers.newThread())

.subscribe(new Consumer>() {

@Override

public void accept(GroupedObservable grouped) throws Exception {

// 得到每个分组数据的的Observable

grouped.subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

// 得到数据

System.out.println("--> accept groupBy(1): groupKey: " + grouped.getKey() + ", value: " + t);

}

});

}

});

输出:

--> accept groupBy(1): groupKey: Odd, value: 1

--> accept groupBy(1): groupKey: Odd, value: 3

--> accept groupBy(1): groupKey: Odd, value: 5

--> accept groupBy(1): groupKey: Odd, value: 7

--> accept groupBy(1): groupKey: Odd, value: 9

--> accept groupBy(1): groupKey: Even, value: 2

--> accept groupBy(1): groupKey: Even, value: 4

--> accept groupBy(1): groupKey: Even, value: 6

--> accept groupBy(1): groupKey: Even, value: 8

--> accept groupBy(1): groupKey: Even, value: 10

2.2 groupBy(keySelector, valueSelector)

GroupBy 操作符通过 keySelector 将原始 Observable 按照 Key 分组,产生不同的 Observable,再通过 valueSelector 对原始的数据进行处理,在发送每一个被处理完成的数据。

实例代码:

// 2. groupBy(Function(T,R),Function(T,R))

// 第一个func对原数据进行分组处理(仅仅分组添加key,不处理原始数据),第二个func对原始数据进行处理

Observable.range(1, 10)

.groupBy(new Function() {

@Override

public String apply(Integer t) throws Exception {

// 对原始数据进行分组处理

return t % 2 == 0 ? "even" : "odd";

}

},new Function() {

@Override

public String apply(Integer t) throws Exception {

// 对原始数据进行数据转换处理

return t + " is " + (t % 2 == 0 ? "even" : "odd");

}

}).observeOn(Schedulers.newThread()).subscribe(new Consumer>() {

@Override

public void accept(GroupedObservable grouped) throws Exception {

grouped.subscribe(new Consumer() {

@Override

public void accept(String t) throws Exception {

// 接受最终的分组处理以及原数据处理后的数据

System.out.println("--> accept groupBy(2): groupKey = " + grouped.getKey()

+ ", value = " + t);

}

});

}

});

输出:

--> accept groupBy(2): groupKey = odd, value = 1 is odd

--> accept groupBy(2): groupKey = odd, value = 3 is odd

--> accept groupBy(2): groupKey = odd, value = 5 is odd

--> accept groupBy(2): groupKey = odd, value = 7 is odd

--> accept groupBy(2): groupKey = odd, value = 9 is odd

--> accept groupBy(2): groupKey = even, value = 2 is even

--> accept groupBy(2): groupKey = even, value = 4 is even

--> accept groupBy(2): groupKey = even, value = 6 is even

--> accept groupBy(2): groupKey = even, value = 8 is even

--> accept groupBy(2): groupKey = even, value = 10 is even

3. Scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果。

3.1 scan(accumulator)

Scan 操作符对原始 Observable 发射的第一项数据应用一个函数,然后将那个函数的结果作为 自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做 accumulator。

解析: 先发送原始数据第一项数据,然后将这个数据与下一个原始数据作为参数传递给 accumulator, 处理后发送这个数据,并与下一个原始数据一起传递到下一次 accumulator ,直到数据序列结束。类似一个累积的过程。

实例代码:

// 1. scan(BiFunction(Integer sum, Integer t2))

// 接受数据序列,从第二个数据开始,每次会将上次处理数据和现在接受的数据进行处理后发送

Observable.range(1, 10)

.scan(new BiFunction() {

@Override

public Integer apply(Integer LastItem, Integer item) throws Exception {

System.out.println("--> apply: LastItem = " + LastItem + ", CurrentItem = " + item);

return LastItem + item; // 实现求和操作

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept scan(1): " + t);

}

});

输出:

--> accept scan(1): 1

--> apply: LastItem = 1, CurrentItem = 2

--> accept scan(1): 3

--> apply: LastItem = 3, CurrentItem = 3

--> accept scan(1): 6

--> apply: LastItem = 6, CurrentItem = 4

--> accept scan(1): 10

--> apply: LastItem = 10, CurrentItem = 5

--> accept scan(1): 15

--> apply: LastItem = 15, CurrentItem = 6

--> accept scan(1): 21

--> apply: LastItem = 21, CurrentItem = 7

--> accept scan(1): 28

--> apply: LastItem = 28, CurrentItem = 8

--> accept scan(1): 36

--> apply: LastItem = 36, CurrentItem = 9

--> accept scan(1): 45

--> apply: LastItem = 45, CurrentItem = 10

--> accept scan(1): 55

3.2 scan(initialValue, accumulator)

有一个 scan 操作符的方法,你可以传递一个种子值给累加器函数的第一次调用(Observable 发射的第一项数据)。如果你使用这个版本,scan 将发射种子值作为自己的第一项数据。

注意: 传递 null 作为种子值与不传递是不同的,null 种子值是合法的。

解析: 指定初始种子值,第一次发送种子值,后续发送原始数据序列以及累计处理数据。

实例代码:

// 2. scan(R,Func2)

// 指定初始种子值,第一次发送种子值,后续发送原始数据序列以及累计处理数据

Observable.range(1, 10)

.scan(100, new BiFunction() { // 指定初始种子数据为100

@Override

public Integer apply(Integer lastValue, Integer item) throws Exception {

System.out.println("--> apply: lastValue = " + lastValue + ", item = " + item);

return lastValue + item; // 指定初值的求和操作

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("--> accept sacn(2) = " + t);

}

});

输出:

--> accept sacn(2) = 100

--> apply: lastValue = 100, item = 1

--> accept sacn(2) = 101

--> apply: lastValue = 101, item = 2

--> accept sacn(2) = 103

--> apply: lastValue = 103, item = 3

--> accept sacn(2) = 106

--> apply: lastValue = 106, item = 4

--> accept sacn(2) = 110

--> apply: lastValue = 110, item = 5

--> accept sacn(2) = 115

--> apply: lastValue = 115, item = 6

--> accept sacn(2) = 121

--> apply: lastValue = 121, item = 7

--> accept sacn(2) = 128

--> apply: lastValue = 128, item = 8

--> accept sacn(2) = 136

--> apply: lastValue = 136, item = 9

--> accept sacn(2) = 145

--> apply: lastValue = 145, item = 10

--> accept sacn(2) = 155

注意: 这个操作符默认不在任何特定的调度器上执行。

Javadoc: scan(initialValue, accumulator)

4. Cast

Cast 将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是 map 的一个特殊版本。转换失败会有Error通知。

4.1 cast(clazz)

将原始数据强制转换为指定的 clazz 类型,如果转换成功发送转换后的数据,否则发送Error通知。一般用于 数据类型的转换 和 数据实际类型的检查(多态)。

实例代码:

//cast(clazz)

// 1. 基本类型转换

Observable.range(1, 5)

.cast(Integer.class)

.subscribe(new Consumer() {

@Override

public void accept(Integer t) throws Exception {

System.out.println("-- accept cast(1): " + t);

}

});

// 2. 转换失败通知

System.out.println("------------------------------------");

Observable.just((byte)1)

.cast(Integer.class)

.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("--> onSubscribe(2)");

}

@Override

public void onNext(Integer t) {

System.out.println("--> onNext(2) = " + t);

}

@Override

public void onError(Throwable e) {

System.out.println("--> onError(2) = " + e.toString());

}

@Override

public void onComplete() {

System.out.println("--> onComplete(2)");

}

});

System.out.println("------------------------------------");

class Animal{

public int id;

}

class Dog extends Animal{

public String name;

@Override

public String toString() {

return "Dog [name=" + name + ", id=" + id + "]";

}

}

// 3. 多态转换,检查数据的实际类型

Animal animal = new Dog();

animal.id = 666;

Observable.just(animal)

.cast(Dog.class)

.subscribe(new Consumer() {

@Override

public void accept(Dog t) throws Exception {

System.out.println("--> accept cast(3): " + t);

}

});

输出:

-- accept cast(1): 1

-- accept cast(1): 2

-- accept cast(1): 3

-- accept cast(1): 4

-- accept cast(1): 5

------------------------------------

--> onSubscribe(2)

--> onError(2) = java.lang.ClassCastException: Cannot cast java.lang.Byte to java.lang.Integer

------------------------------------

--> accept cast(3): Dog [name=null, id=666]

小结:

在实际开发场景中,比如网络数据请求场景,原始的数据格式或类型可能并不满足开发的实际需要,需要对数据进行处理。数据变换操作在实际开发场景中还是非常多的,所以数据的变换是非常重要的。使用Rx的数据变换操作可以轻松完成大多数场景的数据变换操作,提高开发效率。

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

java rx.observable_Rxjava2 Observable的数据变换详解及实例(二)相关推荐

  1. java rx.observable_Rxjava2 Observable的数据变换详解及实例(一)

    简要: 需求了解: 对于 Observable 发射的数据有的时候可能不满足我们的要求,或者需要转化为其他类型的数据,比如:缓存,数据类型转化,数据拦截等.此时可以使用 Rx 中的一些对于数据操作的操 ...

  2. java rx.observable_Rxjava2 Observable的条件操作符详解及实例

    简要: 需求了解: 在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据.跳过指定条件的 Observable 等一系列的条件操作需求,那么很幸运 ...

  3. java rx.observable_Rxjava2 Observable的错误处理操作详解及实例

    简要: 需求了解: Rxjava 中当数据处理派发中发生了异常 ,观察者会接受到一个 Error 的通知,那如果不想发射这个异常的通知,自己处理掉呢?答案当然是可以的,在 Rxjava 中很多操作符可 ...

  4. Java经典算法四十例编程详解+程序实例

    JAVA经典算法40例 [程序1] 题目:古典问题:有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到第四个月后每个月又生一对兔子,假如兔子都不死,问每个月的兔子总数为多少? 1.程序分析: ...

  5. java lock代码写法_java Lock接口详解及实例代码

    java  lock接口 java.util.concurrent.locks 接口lock public interface loce loce实现提供了比使用synchronized方法和语句可获 ...

  6. java程序日期转换_Java 日期转换详解及实例代码

    Java 日期转换 涉及的核心类:Date类.SimpleDateFormat类.Calendar类 一. Date型与long型 Date型转换为long型 Date date = new Date ...

  7. java builder pattern_Java Builder Pattern建造者模式详解及实例

    Java Builder Pattern 1.概念 将一个复杂的构建与其表示相分离,使得同样的构建过程可以创建不同的表示. [构建与表示分离,同构建不同表示] 与抽象工厂的区别:在建造者模式里,有个指 ...

  8. 2020年 第11届 蓝桥杯 Java B组 省赛真题详解及小结【第1场省赛 2020.7.5】

    蓝桥杯 Java B组 省赛决赛 真题详解及小结汇总[2013年(第4届)~2021年(第12届)] 第11届 蓝桥杯-第1.2次模拟(软件类)真题-(2020年3月.4月)-官方讲解视频 说明:部分 ...

  9. 2020年 第11届 蓝桥杯 Java C组 省赛真题详解及小结【第1场省赛 2020.7.5】

    蓝桥杯 Java B组 省赛真题详解及小结汇总[2013年(第4届)~2020年(第11届)] 注意:部分代码及程序 源自 蓝桥杯 官网视频(历年真题解析) 郑未老师. 2013年 第04届 蓝桥杯 ...

最新文章

  1. python pip 安装与使用_Python pip 安装与使用(安装、更新、删除)
  2. 实现java多线程的3种方式,99%人没用过第3种
  3. Ecplise中怎样导入Maven项目
  4. unity hub是什么东西_Unity可编程渲染管线(SRP)教程:一、自定义管线
  5. 中国银行刘东海:净值产品短期难当主角 需关注资产集中处置风险
  6. MFC使用SaveAs函数保存Excel文件时,弹出“文件已存在”问题
  7. 74hc595数码管C语言,74HC595 数码管程序
  8. 学习+彭伟《揭秘深度强化学习》PDF+源代码+资料
  9. BT5 CDLinux+U盘启动 破解无线网络
  10. 使用SpotBugs 进行代码检查
  11. HMI-29-【运动模式】转速表实现及中心油耗仪表实现
  12. kubectl 命令详解(三十一):rollout history
  13. 编程序,输入长方形的两边长a和b,输出长方形的周长和面积
  14. Midjourney之外21款免费的AI Image画图网站集合
  15. 时间序列分析这件小事(八)----格兰杰因果关系检验
  16. MAC 安装flutter环境
  17. html5之role作用
  18. 第四代计算机相关资料,当前的计算机一般被认为是第四代计算机,它所采用的逻辑元件是...
  19. JS学习笔记13-操作内联样式
  20. Foursquare数据集介绍

热门文章

  1. android智能手机推荐,android智能手机推荐!
  2. 联盟人口最多的服务器,魔兽世界国服人口普查,人数排名前十个服务器中有四个联盟碾压服...
  3. Python——How to use python pip/pur
  4. 公安部科信局主管杂志《中国安防》深度专访欧科云链副总裁张超
  5. easyui 实现表格字段排序
  6. Qt QTableView表格排序
  7. Android Studio 新版本 Logcat 速查
  8. EXCEL01:excel与数据格式
  9. Arduino各开发板
  10. php文件上传css,CSS_文件上传input file简便美化方案(css),文件上传input在各个浏览器里 - phpStudy...