【RxJava】使用

虽说Rxjava显然已经有些过时了,但是有些公司还在使用,为了能适应更多的业务代码,并提高自己的开发效率,所以这里仅做个Rxjava使用的总结,不涉及Rxjava内部的实现原理。

RxJava的核心就是异步数据流和响应式编程。

我们平时开发过程中的网络请求、数据库读写、文件读写、定时任务等各种耗时操作,都可以使用RxJava来完成。

在平时的开发中,我们可以把所有的事件(数据)我们都可以看成是一条河流,它可以被观察,被过滤等操作,也可以将多条河流汇合成一条新河流。

引入RxJava

只需要引入如下两个依赖即可使用rxjava:

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

RxJava的几个重要概念

  • 观察者Observer:观察事件变化并处理的主要角色,消费者也可以理解为一种特殊的观察者
  • 被观察者:触发事件并决定什么时候发送事件的主要角色
  • 订阅Subscribe:观察者和被观察者建立关联的操作(代码中的体现经常是被观察者去订阅观察者

Observable、Flowable、Single、Completable、Maybe都是被观察者,这几种被观察者可以通过 toObservable、toFlowable、toSingle、toCompletable、toMaybe 相互转化。

常用操作符及其使用

创建操作符

创建被观察者的各种操作符:create()、just() 等等。

create操作符演示:

private fun testRxjava() {Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到emitter.onNext("发送一个事件1")emitter.onNext("发送一个事件2")// onComplete方法表示事件发送结束emitter.onComplete()}}).subscribe(object : Observer<String> {override fun onSubscribe(d: Disposable) {// 被观察者和观察者建立订阅之后回调的方法Log.i("testLog", "onSubscribe..")}override fun onNext(t: String) {Log.i("testLog", "onNext.. t = $t")}override fun onError(e: Throwable) {// 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))// 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的Log.i("testLog", "onError.. e = ${e.message}")}override fun onComplete() {Log.i("testLog", "onComplete..")}})
}

日志打印如下:

I/testLog: onSubscribe..
I/testLog: onNext.. t = 发送一个事件1
I/testLog: onNext.. t = 发送一个事件2
I/testLog: onComplete..

当然,我们也可以使用Consumer(消费者)充当观察者,Consumer只有一个方法即accept,比起Observer需要实现四个方法来说,显然更加简洁了,

而当我们想要处理异常时,只需要再多传入一个专门接收ThrowableConsumer即可。使用如下:

Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到emitter.onNext("发送一个事件1")emitter.onNext("发送一个事件2")emitter.onError(Throwable("test error!"))// onComplete方法表示事件发送结束emitter.onComplete()}
}).subscribe(// 第一个Consumer,用来处理事件object : Consumer<String> {override fun accept(t: String) {Log.i("testLog", "accept.. t = $t")}},// 异常将会在这里的Consumer中处理object : Consumer<Throwable> {override fun accept(t: Throwable?) {Log.i("testLog", "accept onError.. e = ${t?.message}")}})日志打印如下:
I/testLog: accept.. t = 发送一个事件1
I/testLog: accept.. t = 发送一个事件2
I/testLog: accept onError.. e = test error!

just操作符

通过just操作符,可以非常简单的完成create操作符实现的事情。

just 方法传参限制最多为10个,另外,just 内部实际上是调用了 fromArray 操作符方法,而fromArray方法是不限制传参数量的。

just操作符的使用如下所示:

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("a", 2, "3").subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = a
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3

转换操作符

map() / flatMap() / concatMap() 等。

  • map()操作符可以将被观察者发送的数据类型转变成其他的类型
  • flatMap()可以将事件序列中的元素进行整合加工,返回一个新的被观察者,在网络请求场景中比较常用。
  • concatMap()flatMap()基本一样,只不过concatMap()转化出来的事件是有序的,而flatMap()转化出来的事件是无序的

map()操作符的使用如下:

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("2").map(object : Function<String, Int> {override fun apply(t: String): Int {// 事件发送出来后进入apply方法// 这里将这个事件(字符串2)转化为int类型并加1,然后返回return t.toString().toInt() + 1}}).subscribe(consumer)
}// 日志打印如下:
I/testLog: accept.. t = 3

flatMap()操作符的使用如下:

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("1", "2", "3", "4", "5").flatMap(object : Function<String, ObservableSource<Any>> {override fun apply(t: String): ObservableSource<Any> {// ObservableSource是被观察者的顶层父类,所以其实就是生成一个新的被观察者并返回// 这里拿到的 t 是无序的,如果需要有序,则使用concatMap即可// 这种场景类比于当前需要请求基于上一次请求的结果return Observable.just(t + "3")}}).subscribe(consumer)
}

组合操作符

  • concat():将多个被观察者进行整合,得到一个新的被观察者
  • merge():和concat()作用基本一样,只是concat()串行的,而merge()并行
private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}// 将两个被观察者进行整合,得到一个新的被观察者Observable.concat(Observable.just("1"),Observable.just("2")).subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2

功能操作符

subscribeOn():用来决定执行subscribe()方法所处的线程,也就是发射事件所在的线程,该方法需要传入一个Scheduler对象Schedulers.io()Schedulers.newThread()都可以拿到一个Scheduler对象,它们都可以开启一个子线程,只是Schedulers.io()的底层实现是线程池的形式。

observeOn():用来决定下游事件被处理所处的线程,该方法同样需要传入一个Scheduler对象,一般是该方法来切换回到主线程。

下面是它们的用法:

private fun testRxjava() {Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到Thread.sleep(2000)emitter.onNext("发送一个事件1,当前线程为:" + Thread.currentThread().name)emitter.onNext("发送一个事件2,当前线程为:" + Thread.currentThread().name)// onComplete方法表示事件发送结束emitter.onComplete()}}).subscribeOn(Schedulers.newThread()) // 决定上游事件被处理所处的线程.observeOn(AndroidSchedulers.mainThread()) // 决定下游事件被处理所处的线程.subscribe(object : Observer<String> {override fun onSubscribe(d: Disposable) {// 被观察者和观察者建立订阅之后回调的方法Log.i("testLog", "onSubscribe.." + Thread.currentThread().name)}override fun onNext(t: String) {Log.i("testLog", "onNext.. t = $t " + Thread.currentThread().name)}override fun onError(e: Throwable) {// 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))// 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的Log.i("testLog", "onError.. e = ${e.message + Thread.currentThread().name}")}override fun onComplete() {Log.i("testLog", "onComplete.." + Thread.currentThread().name)}})
}日志打印如下:
I/testLog: onSubscribe..main
I/testLog: onNext.. t = 发送一个事件1,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onNext.. t = 发送一个事件2,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onComplete..main

过滤操作符

filter():过滤掉某些事件

private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}// 发送[1,10]范围内的数值Observable.range(1, 10).filter(object : Predicate<Int> {override fun test(t: Int): Boolean {if (t < 5) {// 如果 t 小于5,就将 t 过滤掉return true} else {// 把 >= 5的t保留return false}}}).subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3
I/testLog: accept.. t = 4

【RxJava】使用相关推荐

  1. RxJava 实现模糊搜索

    实现的效果图如下 下面说下实现的具体方法 1 引入库 implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC5"implement ...

  2. RxJava firstElement 与 lastElement 以及 elementAt

    1 firstElement 文档如下 2 lastElement 文档如下 3 elementAt 文档如下 下面写一个下代码 firstElement Observable.just(1,2,3, ...

  3. RxJava 过滤操作符 throttleFirst 与 throttleLast 以及 sample

    看文档发现 throttleFirst 与 throttleLast 以及 Sample 都跳到同一个界面Sample throttleFirst :在某段时间内,只发送该段时间内第1次事件(假如一个 ...

  4. RxJava 过滤操作符 distinct 和 distinctUntilChanged

    distinct  看下文档 distinct  : 过滤掉重复的元素 distinctUntilChanged: 过滤掉连续重复的元素,不连续重复的是不过滤 看下代码 1 distinct Obse ...

  5. RxJava 过滤操作符 take 与 takeLast

    take 看下官方文档 take : 指定 观察者正序接受指定的items数量 takeLast 指定观察者正序接受最后指定的items的数量 看下demo take的代码 Observable.ju ...

  6. RxJava 过滤操作符skip 与 skipLast

    skip 看下文档 skip 是正序跳过指定的items skipLast 是正序跳过指定最后几个items 下面看下代码 Observable.just(1,2,3,4,5,6).skip(1)// ...

  7. RxJava 变换操作符Map

    看下文档如下 通过对每个项目应用函数来转换Observable发出的项目 个人理解为转换类型 下面写一个把int 类型转换为String 类型的demo Observable.create(new O ...

  8. RxJava 操作符 do

    看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...

  9. RxJava debounce()和throttleWithTimeout()

    官方地址:http://reactivex.io/documentation/operators/debounce.html debounce :防抖动 throttleWithTimeout:节流超 ...

  10. RxJava 解除订阅---------Disposable.dispose()方法

    有时候我们需要解绑订阅,或者取消订阅, 这个时候就使用到了 Disposable.dispose()方法下面以一个案例说下使用方法 //Disposable.dispose()切断观察者 与 被观察者 ...

最新文章

  1. Win2K下关联进程/端口之代码初步分析
  2. 如何在防火墙或路由器中禁止访问一些公司不相关网站
  3. Pudb调试python
  4. 上一秒投简历下一秒被裁 ?小心,你的一举一动可能都在监控中
  5. Debian下使用Doxygen生成定制样式的开发文档
  6. ERP系统之JPJDE入门-1 JDE 简史
  7. linux系统能运行iis吗,Linux 下可以安装 IIS 吗
  8. linux centos用户修改密码,centos怎么修改用户密码
  9. mp4视频文件截图--h264解码成yuv再转存为bmp图片
  10. python输入若干个数字求和
  11. Hive开启WebUI
  12. 第十五届全国大学生智能汽车竞赛华南赛区获奖信息
  13. VUE + ONLYOFFICE
  14. gboard包名_如何在Android的Gboard键盘中搜索表情符号和GIF
  15. 用Python爬取豆瓣首页所有电影名称、每部电影影评及生成词云
  16. Openstack云平台的搭建与部署(具体实验过程截图评论拿)
  17. aux ps 和top_关于vmstat,top,ps aux查看的cpu占用率不一致的问题
  18. VSCode 环境配置管理
  19. Node.js - 自我总结
  20. 用迭代器遍历map 集合

热门文章

  1. 大华相机IP网段更新配置
  2. sidirect 连接西门子_如何配置DASSIDirect与西门子
  3. 假如你接近了“黑洞”,你很可能结束一切,也可能回到过去!
  4. java最新版下载地址
  5. 2022-2028年中国应急管理行业市场调查研究及发展前景展望报告
  6. python 储蓄计划_储蓄--投资恒式为什么不意味着计划的储蓄恒等于计划的投资?...
  7. 登录接口解析与接口测试用例
  8. Mac电脑隔空投递如何添加到菜单栏?
  9. 安全419《高级威胁检测与响应解决方案》系列访谈——未来智安(XDR SEC)篇
  10. linux与Windows下查看md5值