RxJava详解(基于2.X版本的功能操作符)
本章节讲述RxJava基于2.X版本的功能操作符
1.subscribe()
<1> 作用
订阅,即连接观察者 & 被观察者。
<2> 代码&结果
https://blog.csdn.net/weixin_37730482/article/details/69280013
有多个重载的方法。
2.Observable.subscribeOn() &Observable.observeOn()
<1> 作用
指定被观察者&观察者所在的线程。
<2> 代码&结果
https://blog.csdn.net/weixin_37730482/article/details/74460807
3.delay()
<1> 作用
使得被观察者延迟一段时间再发送事件。
<2> 代码
package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import java.util.concurrent.TimeUnit;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava delay功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext("被观察者发送数据");Log.d("TAG", "被观察者发送数据 开始发送数据...");}}}).delay(5, TimeUnit.SECONDS)//被观察者延时5秒发送数据.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}
<3> 结果
D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 被观察者发送数据 开始发送数据...//5秒后打印D/TAG: 观察者 onNext 方法 value.toString()----:被观察者发送数据
<4> 关闭页面
D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅
<5> delay重载方法
delay(long delay,TimeUnit unit)参数1:时间参数2:时间单位delay(long delay,TimeUnit unit,mScheduler scheduler)参数1:时间参数2:时间单位参数3:线程调度器delay(long delay,TimeUnit unit,boolean delayError)指定延迟时间 & 错误延迟 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常。参数1:时间参数2:时间单位参数3:错误延迟参数delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟。参数1:时间参数2:时间单位参数3:线程调度器参数4:错误延迟参数
4.doXXX()
<1> 作用
在某个事件的生命周期中调用。如发送事件前的初始化、发送事件后的回调请求等等。
具体方法如下
doOnSubscribe():观察者订阅时调用。doOnEach():被观察者调用一次执行一次。doOnNext():onNext方法调用前调用。doAfterNext():onNext方法调用后调用。doOnComplete():onComplete方法执行时调用。doOnError():onError方法执行时调用。doOnTerminate():正常发送完成&异常完成都执行。doFinally():最后执行。doAfterTerminate():真正的最后执行。
<2> 代码
package com.wjn.rxdemo.rxjava;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import com.wjn.rxdemo.R;import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;public class RxJavaActivity extends AppCompatActivity {@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);method();}/*** 创建 RxJava doXXX功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext(new RuntimeException());e.onComplete();}}}).doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) {Log.d("TAG", "生命周期 doOnSubscribe方法 执行...----:" + disposable.isDisposed());}}).doOnEach(new Consumer<Notification<Object>>() {@Overridepublic void accept(Notification<Object> objectNotification) {Log.d("TAG", "生命周期 doOnEach方法 执行...----:" + objectNotification.toString());}}).doOnNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doOnNext方法 执行...----:" + o.toString());}}).doAfterNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doAfterNext方法 执行...----:" + o.toString());}}).doOnComplete(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doOnComplete方法 执行...");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) {Log.d("TAG", "生命周期 doOnError方法 执行...----:" + throwable.toString());}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d("TAG", "生命周期 doOnTerminate方法 执行...");}}).doAfterTerminate(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doAfterTerminate方法 执行...");}}).doFinally(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doFinally方法 执行...");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("TAG", "观察者 onSubscribe方法 执行...");}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法执行...value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {Log.d("TAG", "观察者 onError 方法执行...e.toString()----:" + e.toString());}@Overridepublic void onComplete() {Log.d("TAG", "观察者 onComplete方法 执行...");}});}}
<3> 结果
被观察者 执行 e.onNext(1); 即发生第一次正确的onNext方法D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnNextNotification[1]D/TAG: 生命周期 doOnNext方法 执行...----:1D/TAG: 观察者 onNext 方法执行...value.toString()----:1D/TAG: 生命周期 doAfterNext方法 执行...----:1**************************************************************************被观察者 执行 e.onNext(new RuntimeException()); 即发生第二次错误的onNext方法D/TAG: 生命周期 doOnEach方法 执行...----:OnNextNotification[java.lang.RuntimeException]D/TAG: 生命周期 doOnNext方法 执行...----:java.lang.RuntimeExceptionD/TAG: 观察者 onNext 方法执行...value.toString()----:java.lang.RuntimeExceptionD/TAG: 生命周期 doAfterNext方法 执行...----:java.lang.RuntimeException*****************************************************************************被观察者 执行 e.onComplete(); 即发生最后接收发生数据D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...
<4> 上述代码修改create操作符,只发送onComplete。
if (!e.isDisposed()) {e.onComplete();
}
结果
D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...
<5> 上述代码修改create操作符,只发送onError。
if (!e.isDisposed()) {e.onError(new RuntimeException());
}
结果
D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnErrorNotification[java.lang.RuntimeException]D/TAG: 生命周期 doOnError方法 执行...----:java.lang.RuntimeExceptionD/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onError 方法执行...e.toString()----:java.lang.RuntimeExceptionD/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...
<6> 上述代码修改create操作符,为empty操作符。
代码
package com.wjn.rxdemo.rxjava;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import com.wjn.rxdemo.R;import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;public class RxJavaActivity extends AppCompatActivity {@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);method();}/*** 创建 RxJava doXXX功能操作符*/public void method() {Observable.empty().doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) {Log.d("TAG", "生命周期 doOnSubscribe方法 执行...----:" + disposable.isDisposed());}}).doOnEach(new Consumer<Notification<Object>>() {@Overridepublic void accept(Notification<Object> objectNotification) {Log.d("TAG", "生命周期 doOnEach方法 执行...----:" + objectNotification.toString());}}).doOnNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doOnNext方法 执行...----:" + o.toString());}}).doAfterNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doAfterNext方法 执行...----:" + o.toString());}}).doOnComplete(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doOnComplete方法 执行...");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) {Log.d("TAG", "生命周期 doOnError方法 执行...----:" + throwable.toString());}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d("TAG", "生命周期 doOnTerminate方法 执行...");}}).doAfterTerminate(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doAfterTerminate方法 执行...");}}).doFinally(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doFinally方法 执行...");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("TAG", "观察者 onSubscribe方法 执行...");}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法执行...value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {Log.d("TAG", "观察者 onError 方法执行...e.toString()----:" + e.toString());}@Overridepublic void onComplete() {Log.d("TAG", "观察者 onComplete方法 执行...");}});}}
结果
D/TAG: 生命周期 doOnSubscribe方法 执行...----:trueD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...
5. onErrorReturn()
<1> 作用
遇到观察者发送错误时,发送1个特殊事件 正常终止。
<2> 代码
package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava onErrorReturn功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).onErrorReturn(new Function<Throwable, Object>() {@Overridepublic Object apply(Throwable throwable) throws Exception {return "Code:123";}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}
<3> 结果
D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:Code:123
<4> 关闭页面
D/TAG: onDestroy方法 执行时是否断开----:true
6.onErrorResumeNext()&onExceptionResumeNext()
<1> 作用
遇到错误时,发送1个新的Observable。
onErrorResumeNext():拦截的错误 = Throwable。
onExceptionResumeNext():拦截的错误 = Exception。
<2> 代码
package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava onErrorResumeNext功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {return Observable.just("遇到观察者发送的错误数据 修改后的内容");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}
<3> 结果
D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:遇到观察者发送的错误数据 修改后的内容
7. retry()
<1> 作用
重试,即当出现错误时,让被观察者(Observable)重新发射数据。Throwable 和 Exception 都可拦截。
retry():出现错误时,让被观察者重新发送数据。若一直错误,则一直重新发送。如果不出现错误,不重复执行。
retry(long time):出现错误时,让被观察者重新发送数据。可设置重试次数。如果不出现错误,不重复执行。
retry(long times, Predicate<? super Throwable> predicate) 出现错误后,判断是否需要重新发送数据 如果需要 重试times次。如果不出现错误,不重复执行。
<2> 代码
package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava retry功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).retry(2)//重试两次.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}
<3> 结果
D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三
重试2次,一共执行3次。
<4> 关闭页面
D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅
8.retryUntil()
<1> 作用
出现错误后,判断是否需要重新发送数据。类似 retry(long times, Predicate<? super Throwable> predicate) 出现错误后,判断是否需要重新发送数据 如果需要 重试times次。
<2> 代码
package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BooleanSupplier;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava Map变换符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).retryUntil(new BooleanSupplier() {@Overridepublic boolean getAsBoolean() throws Exception {return false;//一直重试}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}
<3> 结果
return false;//一直重试return true;//不重试
9.retryWhen()
<1> 作用
遇到错误时,将发生的错误传递给一个新的被观察者。并决定是否需要重新订阅原始被观察者。
<2> 代码
略
<3> 结果
略
10.repeat()
<1> 作用
重复不断地发送被观察者事件。
repeat():重复不断地发送被观察者事件。
repeat(long times):重复 times次 发送被观察者事件。
<2> 代码
package com.example.rxjava20;import android.os.Bundle;
import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava repeat功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onComplete();}}}).repeat(2)//重复两次.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}
<3> 结果
D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三
<4> 关闭页面
D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅
11.repeatWhen()
<1> 作用
有条件地、重复发送 被观察者事件。
<2> 代码
略
<3> 结果
略
RxJava详解(基于2.X版本的功能操作符)相关推荐
- react-navigation(6.0.6版本)使用详解(基于RN0.65*版本)
命令安装 // 安装基础包 ^6.0.6 yarn add @react-navigation/native -S // 安装路由包 ^6.2.5 yarn add @react-navigation ...
- android ------- 开发者的 RxJava 详解
在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github: https://github.com/ReactiveX/RxJava https://githu ...
- 扔物线------给 Android 开发者的 RxJava 详解
本文转载自扔物线的文章:http://gank.io/post/560e15be2dca930e00da1083 给 Android 开发者的 RxJava 详解 <p>作者:<a ...
- Android 开发者的 RxJava 详解 - 作者:扔物线
前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近 ...
- 给 Android 开发者的 RxJava 详解(作者:扔物线)
前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近 ...
- RxJava 详解 -- 作者:扔物线
转载自:http://gank.io/post/560e15be2dca930e00da1083 这篇文章的目的有两个: 1. 给对 RxJava 感兴趣的人一些入门的指引 2. 给正在使用 RxJa ...
- Android之RxJava 详解
前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近 ...
- python selenium爬虫_详解基于python +Selenium的爬虫
详解基于python +Selenium的爬虫 一.背景 1. Selenium Selenium 是一个用于web应用程序自动化测试的工具,直接运行在浏览器当中,支持chrome.firefox等主 ...
- 《嵌入式Linux软硬件开发详解——基于S5PV210处理器》——1.2 S5PV210处理器
本节书摘来自异步社区<嵌入式Linux软硬件开发详解--基于S5PV210处理器>一书中的第1章,第1.2节,作者 刘龙,更多章节内容可以访问云栖社区"异步社区"公众号 ...
最新文章
- pycharm代码自动补全功能
- T端音乐盒子-NPC脚本
- 【Flutter】Flutter 混合开发 ( Flutter 与 Native 通信 | 在 Flutter 端实现 EventChannel 通信 )
- Python 技巧篇-让我的程序暂停一下
- PyTorch可视化理解卷积神经网络
- 数据分析方法(一):对比与对标
- ACM 学习笔记(七) 贪心
- WPF学习笔记(6):DataSet更新后台数据库个别列失败的问题
- DRF serializer 自定义列
- 工业互联网平台TOP15发布!附15个平台详细介绍!
- Java语言实现会议安排问题,利用贪心法思想解决问题
- 记一次网站漏洞修复经历
- 前端开发工程师需要具备哪些专业技能?
- 关于struts.xml的配置思考。
- 胡玉平 计算机科学,基于代价敏感混合分裂策略的多决策树算法
- 教你2种常用的电商高并发处理解决方案
- APP设计之启动页和广告页
- 【解决方法】ubuntu20 hp1020 打印机不识别无反应
- NMEA GPRMC 格式图解,NMEA 工具的比较
- ic芯片方案设计流程你知道多少?
热门文章
- matlab有限元工具箱计算+python绘图
- AltiumDesigner20.0.10安装+防局域网(多版本支持)+许可带视频教程
- 三菱FX3U与2台台达温控器modbus通讯案例 功能:三菱FX3U与2台台达温控器进行modbus通讯
- php 微信分享快速实现
- 透过Python 将接收邮件邮件进行分类统计
- Kindeditor上传图片成功,但显示上传失败
- Microsoft Office 2010信息
- 【近期解决的小问题】
- 用matlab验证傅里叶变换的基本性质
- GFS/HDFS/TFS/FastDFS/Ceph/GlusterF