本章节讲述RxJava基于2.X版本的功能操作符

1.subscribe()

<1> 作用

订阅,即连接观察者 & 被观察者。

<2> 代码&结果

https://blog.csdn.net/weixin_37730482/article/details/69280013

有多个重载的方法。

2.Observable.subscribeOn() &Observable.observeOn() 

<1> 作用

指定被观察者&观察者所在的线程。

<2> 代码&结果

https://blog.csdn.net/weixin_37730482/article/details/74460807

3.delay()

<1> 作用

使得被观察者延迟一段时间再发送事件。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import java.util.concurrent.TimeUnit;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava delay功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext("被观察者发送数据");Log.d("TAG", "被观察者发送数据 开始发送数据...");}}}).delay(5, TimeUnit.SECONDS)//被观察者延时5秒发送数据.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 被观察者发送数据 开始发送数据...//5秒后打印D/TAG: 观察者 onNext 方法 value.toString()----:被观察者发送数据

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅

<5> delay重载方法

delay(long delay,TimeUnit unit)参数1:时间参数2:时间单位delay(long delay,TimeUnit unit,mScheduler scheduler)参数1:时间参数2:时间单位参数3:线程调度器delay(long delay,TimeUnit unit,boolean delayError)指定延迟时间  & 错误延迟 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常。参数1:时间参数2:时间单位参数3:错误延迟参数delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟。参数1:时间参数2:时间单位参数3:线程调度器参数4:错误延迟参数

4.doXXX()

<1> 作用

在某个事件的生命周期中调用。如发送事件前的初始化、发送事件后的回调请求等等。

具体方法如下

doOnSubscribe():观察者订阅时调用。doOnEach():被观察者调用一次执行一次。doOnNext():onNext方法调用前调用。doAfterNext():onNext方法调用后调用。doOnComplete():onComplete方法执行时调用。doOnError():onError方法执行时调用。doOnTerminate():正常发送完成&异常完成都执行。doFinally():最后执行。doAfterTerminate():真正的最后执行。

<2> 代码

package com.wjn.rxdemo.rxjava;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import com.wjn.rxdemo.R;import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;public class RxJavaActivity extends AppCompatActivity {@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);method();}/*** 创建 RxJava doXXX功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext(new RuntimeException());e.onComplete();}}}).doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) {Log.d("TAG", "生命周期 doOnSubscribe方法 执行...----:" + disposable.isDisposed());}}).doOnEach(new Consumer<Notification<Object>>() {@Overridepublic void accept(Notification<Object> objectNotification) {Log.d("TAG", "生命周期 doOnEach方法 执行...----:" + objectNotification.toString());}}).doOnNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doOnNext方法 执行...----:" + o.toString());}}).doAfterNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doAfterNext方法 执行...----:" + o.toString());}}).doOnComplete(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doOnComplete方法 执行...");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) {Log.d("TAG", "生命周期 doOnError方法 执行...----:" + throwable.toString());}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d("TAG", "生命周期 doOnTerminate方法 执行...");}}).doAfterTerminate(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doAfterTerminate方法 执行...");}}).doFinally(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doFinally方法 执行...");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("TAG", "观察者 onSubscribe方法 执行...");}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法执行...value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {Log.d("TAG", "观察者 onError 方法执行...e.toString()----:" + e.toString());}@Overridepublic void onComplete() {Log.d("TAG", "观察者 onComplete方法 执行...");}});}}

<3> 结果

被观察者 执行 e.onNext(1); 即发生第一次正确的onNext方法D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnNextNotification[1]D/TAG: 生命周期 doOnNext方法 执行...----:1D/TAG: 观察者 onNext 方法执行...value.toString()----:1D/TAG: 生命周期 doAfterNext方法 执行...----:1**************************************************************************被观察者 执行 e.onNext(new RuntimeException()); 即发生第二次错误的onNext方法D/TAG: 生命周期 doOnEach方法 执行...----:OnNextNotification[java.lang.RuntimeException]D/TAG: 生命周期 doOnNext方法 执行...----:java.lang.RuntimeExceptionD/TAG: 观察者 onNext 方法执行...value.toString()----:java.lang.RuntimeExceptionD/TAG: 生命周期 doAfterNext方法 执行...----:java.lang.RuntimeException*****************************************************************************被观察者 执行 e.onComplete(); 即发生最后接收发生数据D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

<4> 上述代码修改create操作符,只发送onComplete。

if (!e.isDisposed()) {e.onComplete();
}

结果

D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

<5> 上述代码修改create操作符,只发送onError。

if (!e.isDisposed()) {e.onError(new RuntimeException());
}

结果

D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnErrorNotification[java.lang.RuntimeException]D/TAG: 生命周期 doOnError方法 执行...----:java.lang.RuntimeExceptionD/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onError 方法执行...e.toString()----:java.lang.RuntimeExceptionD/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

<6> 上述代码修改create操作符,为empty操作符。

代码

package com.wjn.rxdemo.rxjava;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import com.wjn.rxdemo.R;import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;public class RxJavaActivity extends AppCompatActivity {@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);method();}/*** 创建 RxJava doXXX功能操作符*/public void method() {Observable.empty().doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) {Log.d("TAG", "生命周期 doOnSubscribe方法 执行...----:" + disposable.isDisposed());}}).doOnEach(new Consumer<Notification<Object>>() {@Overridepublic void accept(Notification<Object> objectNotification) {Log.d("TAG", "生命周期 doOnEach方法 执行...----:" + objectNotification.toString());}}).doOnNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doOnNext方法 执行...----:" + o.toString());}}).doAfterNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doAfterNext方法 执行...----:" + o.toString());}}).doOnComplete(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doOnComplete方法 执行...");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) {Log.d("TAG", "生命周期 doOnError方法 执行...----:" + throwable.toString());}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d("TAG", "生命周期 doOnTerminate方法 执行...");}}).doAfterTerminate(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doAfterTerminate方法 执行...");}}).doFinally(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doFinally方法 执行...");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("TAG", "观察者 onSubscribe方法 执行...");}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法执行...value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {Log.d("TAG", "观察者 onError 方法执行...e.toString()----:" + e.toString());}@Overridepublic void onComplete() {Log.d("TAG", "观察者 onComplete方法 执行...");}});}}

结果

D/TAG: 生命周期 doOnSubscribe方法 执行...----:trueD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

5. onErrorReturn()

<1> 作用

遇到观察者发送错误时,发送1个特殊事件 正常终止。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava onErrorReturn功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).onErrorReturn(new Function<Throwable, Object>() {@Overridepublic Object apply(Throwable throwable) throws Exception {return "Code:123";}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:Code:123

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:true

6.onErrorResumeNext()&onExceptionResumeNext()

<1> 作用

遇到错误时,发送1个新的Observable。

onErrorResumeNext():拦截的错误 = Throwable。

onExceptionResumeNext():拦截的错误 = Exception。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava onErrorResumeNext功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {return Observable.just("遇到观察者发送的错误数据 修改后的内容");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:遇到观察者发送的错误数据 修改后的内容

7. retry()

<1> 作用

重试,即当出现错误时,让被观察者(Observable)重新发射数据。Throwable 和 Exception 都可拦截。

retry():出现错误时,让被观察者重新发送数据。若一直错误,则一直重新发送。如果不出现错误,不重复执行。

retry(long time):出现错误时,让被观察者重新发送数据。可设置重试次数。如果不出现错误,不重复执行。

retry(long times, Predicate<? super Throwable> predicate) 出现错误后,判断是否需要重新发送数据 如果需要 重试times次。如果不出现错误,不重复执行。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava retry功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).retry(2)//重试两次.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三

重试2次,一共执行3次。

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅

8.retryUntil()

<1> 作用

出现错误后,判断是否需要重新发送数据。类似 retry(long times, Predicate<? super Throwable> predicate) 出现错误后,判断是否需要重新发送数据 如果需要 重试times次。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BooleanSupplier;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava Map变换符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).retryUntil(new BooleanSupplier() {@Overridepublic boolean getAsBoolean() throws Exception {return false;//一直重试}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

return false;//一直重试return true;//不重试

9.retryWhen()

<1> 作用

遇到错误时,将发生的错误传递给一个新的被观察者。并决定是否需要重新订阅原始被观察者。

<2> 代码

<3> 结果

10.repeat()

<1> 作用

重复不断地发送被观察者事件。

repeat():重复不断地发送被观察者事件。

repeat(long times):重复 times次 发送被观察者事件。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava repeat功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onComplete();}}}).repeat(2)//重复两次.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅

11.repeatWhen()

<1> 作用

有条件地、重复发送 被观察者事件。

<2> 代码

<3> 结果

RxJava详解(基于2.X版本的功能操作符)相关推荐

  1. react-navigation(6.0.6版本)使用详解(基于RN0.65*版本)

    命令安装 // 安装基础包 ^6.0.6 yarn add @react-navigation/native -S // 安装路由包 ^6.2.5 yarn add @react-navigation ...

  2. android ------- 开发者的 RxJava 详解

    在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:  https://github.com/ReactiveX/RxJava  https://githu ...

  3. 扔物线------给 Android 开发者的 RxJava 详解

    本文转载自扔物线的文章:http://gank.io/post/560e15be2dca930e00da1083 给 Android 开发者的 RxJava 详解 <p>作者:<a ...

  4. Android 开发者的 RxJava 详解 - 作者:扔物线

    前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近 ...

  5. 给 Android 开发者的 RxJava 详解(作者:扔物线)

    前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近 ...

  6. RxJava 详解 -- 作者:扔物线

    转载自:http://gank.io/post/560e15be2dca930e00da1083 这篇文章的目的有两个: 1. 给对 RxJava 感兴趣的人一些入门的指引 2. 给正在使用 RxJa ...

  7. Android之RxJava 详解

    前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近 ...

  8. python selenium爬虫_详解基于python +Selenium的爬虫

    详解基于python +Selenium的爬虫 一.背景 1. Selenium Selenium 是一个用于web应用程序自动化测试的工具,直接运行在浏览器当中,支持chrome.firefox等主 ...

  9. 《嵌入式Linux软硬件开发详解——基于S5PV210处理器》——1.2 S5PV210处理器

    本节书摘来自异步社区<嵌入式Linux软硬件开发详解--基于S5PV210处理器>一书中的第1章,第1.2节,作者 刘龙,更多章节内容可以访问云栖社区"异步社区"公众号 ...

最新文章

  1. pycharm代码自动补全功能
  2. T端音乐盒子-NPC脚本
  3. 【Flutter】Flutter 混合开发 ( Flutter 与 Native 通信 | 在 Flutter 端实现 EventChannel 通信 )
  4. Python 技巧篇-让我的程序暂停一下
  5. PyTorch可视化理解卷积神经网络
  6. 数据分析方法(一):对比与对标
  7. ACM 学习笔记(七) 贪心
  8. WPF学习笔记(6):DataSet更新后台数据库个别列失败的问题
  9. DRF serializer 自定义列
  10. 工业互联网平台TOP15发布!附15个平台详细介绍!
  11. Java语言实现会议安排问题,利用贪心法思想解决问题
  12. 记一次网站漏洞修复经历
  13. 前端开发工程师需要具备哪些专业技能?
  14. 关于struts.xml的配置思考。
  15. 胡玉平 计算机科学,基于代价敏感混合分裂策略的多决策树算法
  16. 教你2种常用的电商高并发处理解决方案
  17. APP设计之启动页和广告页
  18. 【解决方法】ubuntu20 hp1020 打印机不识别无反应
  19. NMEA GPRMC 格式图解,NMEA 工具的比较
  20. ic芯片方案设计流程你知道多少?

热门文章

  1. matlab有限元工具箱计算+python绘图
  2. AltiumDesigner20.0.10安装+防局域网(多版本支持)+许可带视频教程
  3. 三菱FX3U与2台台达温控器modbus通讯案例 功能:三菱FX3U与2台台达温控器进行modbus通讯
  4. php 微信分享快速实现
  5. 透过Python 将接收邮件邮件进行分类统计
  6. Kindeditor上传图片成功,但显示上传失败
  7. Microsoft Office 2010信息
  8. 【近期解决的小问题】
  9. 用matlab验证傅里叶变换的基本性质
  10. GFS/HDFS/TFS/FastDFS/Ceph/GlusterF