【RxJava】使用
【RxJava】使用
虽说Rxjava显然已经有些过时了,但是有些公司还在使用,为了能适应更多的业务代码,并提高自己的开发效率,所以这里仅做个Rxjava使用的总结,不涉及Rxjava内部的实现原理。
RxJava的核心就是异步数据流和响应式编程。
我们平时开发过程中的网络请求、数据库读写、文件读写、定时任务等各种耗时操作,都可以使用RxJava来完成。
在平时的开发中,我们可以把所有的事件(数据)我们都可以看成是一条河流,它可以被观察,被过滤等操作,也可以将多条河流汇合成一条新河流。
引入RxJava
只需要引入如下两个依赖即可使用rxjava:
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
RxJava的几个重要概念
- 观察者Observer:观察事件变化并处理的主要角色,消费者也可以理解为一种特殊的观察者
- 被观察者:触发事件并决定什么时候发送事件的主要角色
- 订阅Subscribe:观察者和被观察者建立关联的操作(代码中的体现经常是
被观察者去订阅观察者
)
Observable、Flowable、Single、Completable、Maybe都是被观察者,这几种被观察者可以通过 toObservable、toFlowable、toSingle、toCompletable、toMaybe 相互转化。
常用操作符及其使用
创建操作符
创建被观察者
的各种操作符:create()、just() 等等。
create操作符演示:
private fun testRxjava() {Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到emitter.onNext("发送一个事件1")emitter.onNext("发送一个事件2")// onComplete方法表示事件发送结束emitter.onComplete()}}).subscribe(object : Observer<String> {override fun onSubscribe(d: Disposable) {// 被观察者和观察者建立订阅之后回调的方法Log.i("testLog", "onSubscribe..")}override fun onNext(t: String) {Log.i("testLog", "onNext.. t = $t")}override fun onError(e: Throwable) {// 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))// 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的Log.i("testLog", "onError.. e = ${e.message}")}override fun onComplete() {Log.i("testLog", "onComplete..")}})
}
日志打印如下:
I/testLog: onSubscribe..
I/testLog: onNext.. t = 发送一个事件1
I/testLog: onNext.. t = 发送一个事件2
I/testLog: onComplete..
当然,我们也可以使用Consumer
(消费者)充当观察者,Consumer只有一个方法即accept
,比起Observer
需要实现四个方法
来说,显然更加简洁了,
而当我们想要处理异常时,只需要再多传入一个专门接收Throwable
的Consumer
即可。使用如下:
Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到emitter.onNext("发送一个事件1")emitter.onNext("发送一个事件2")emitter.onError(Throwable("test error!"))// onComplete方法表示事件发送结束emitter.onComplete()}
}).subscribe(// 第一个Consumer,用来处理事件object : Consumer<String> {override fun accept(t: String) {Log.i("testLog", "accept.. t = $t")}},// 异常将会在这里的Consumer中处理object : Consumer<Throwable> {override fun accept(t: Throwable?) {Log.i("testLog", "accept onError.. e = ${t?.message}")}})日志打印如下:
I/testLog: accept.. t = 发送一个事件1
I/testLog: accept.. t = 发送一个事件2
I/testLog: accept onError.. e = test error!
just操作符:
通过just操作符,可以非常简单的完成create操作符实现的事情。
just 方法传参限制最多为10个,另外,just 内部实际上是调用了 fromArray
操作符方法,而fromArray
方法是不限制传参数量的。
just操作符的使用如下所示:
private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("a", 2, "3").subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = a
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3
转换操作符
map() / flatMap() / concatMap() 等。
map()
操作符可以将被观察者发送的数据类型转变成其他的类型。flatMap()
可以将事件序列中的元素进行整合加工,返回一个新的被观察者,在网络请求场景中比较常用。concatMap()
和flatMap()
基本一样,只不过concatMap()
转化出来的事件是有序的,而flatMap()
转化出来的事件是无序的。
map()操作符的使用如下:
private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("2").map(object : Function<String, Int> {override fun apply(t: String): Int {// 事件发送出来后进入apply方法// 这里将这个事件(字符串2)转化为int类型并加1,然后返回return t.toString().toInt() + 1}}).subscribe(consumer)
}// 日志打印如下:
I/testLog: accept.. t = 3
flatMap()操作符的使用如下:
private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}Observable.just("1", "2", "3", "4", "5").flatMap(object : Function<String, ObservableSource<Any>> {override fun apply(t: String): ObservableSource<Any> {// ObservableSource是被观察者的顶层父类,所以其实就是生成一个新的被观察者并返回// 这里拿到的 t 是无序的,如果需要有序,则使用concatMap即可// 这种场景类比于当前需要请求基于上一次请求的结果return Observable.just(t + "3")}}).subscribe(consumer)
}
组合操作符
- concat():将多个被观察者进行整合,得到一个新的被观察者
- merge():和
concat()
作用基本一样,只是concat()
是串行的,而merge()
是并行的
private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}// 将两个被观察者进行整合,得到一个新的被观察者Observable.concat(Observable.just("1"),Observable.just("2")).subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2
功能操作符
subscribeOn()
:用来决定执行subscribe()
方法所处的线程,也就是发射事件所在的线程,该方法需要传入一个Scheduler对象,Schedulers.io()
和Schedulers.newThread()
都可以拿到一个Scheduler对象,它们都可以开启一个子线程,只是Schedulers.io()
的底层实现是线程池的形式。
observeOn()
:用来决定下游事件被处理所处的线程,该方法同样需要传入一个Scheduler对象,一般是该方法来切换回到主线程。
下面是它们的用法:
private fun testRxjava() {Observable.create(object : ObservableOnSubscribe<String> {override fun subscribe(emitter: ObservableEmitter<String>) {// 事件产生的地方,可以通过 emitter对象 来发送事件,发送一个事件,会被观察者的 onNext方法 监听到Thread.sleep(2000)emitter.onNext("发送一个事件1,当前线程为:" + Thread.currentThread().name)emitter.onNext("发送一个事件2,当前线程为:" + Thread.currentThread().name)// onComplete方法表示事件发送结束emitter.onComplete()}}).subscribeOn(Schedulers.newThread()) // 决定上游事件被处理所处的线程.observeOn(AndroidSchedulers.mainThread()) // 决定下游事件被处理所处的线程.subscribe(object : Observer<String> {override fun onSubscribe(d: Disposable) {// 被观察者和观察者建立订阅之后回调的方法Log.i("testLog", "onSubscribe.." + Thread.currentThread().name)}override fun onNext(t: String) {Log.i("testLog", "onNext.. t = $t " + Thread.currentThread().name)}override fun onError(e: Throwable) {// 可以在subscribe方法中手动抛出异常,如emitter.onError(Throwable("test error!"))// 但是会导致 onComplete() 方法无法回调,也就是说onComplete和onError是互斥的Log.i("testLog", "onError.. e = ${e.message + Thread.currentThread().name}")}override fun onComplete() {Log.i("testLog", "onComplete.." + Thread.currentThread().name)}})
}日志打印如下:
I/testLog: onSubscribe..main
I/testLog: onNext.. t = 发送一个事件1,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onNext.. t = 发送一个事件2,当前线程为:RxNewThreadScheduler-1 main
I/testLog: onComplete..main
过滤操作符
filter():过滤掉某些事件
private fun testRxjava() {val consumer = object : Consumer<Any> {override fun accept(t: Any?) {Log.i("testLog", "accept.. t = $t")}}// 发送[1,10]范围内的数值Observable.range(1, 10).filter(object : Predicate<Int> {override fun test(t: Int): Boolean {if (t < 5) {// 如果 t 小于5,就将 t 过滤掉return true} else {// 把 >= 5的t保留return false}}}).subscribe(consumer)
}日志打印如下:
I/testLog: accept.. t = 1
I/testLog: accept.. t = 2
I/testLog: accept.. t = 3
I/testLog: accept.. t = 4
【RxJava】使用相关推荐
- RxJava 实现模糊搜索
实现的效果图如下 下面说下实现的具体方法 1 引入库 implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC5"implement ...
- RxJava firstElement 与 lastElement 以及 elementAt
1 firstElement 文档如下 2 lastElement 文档如下 3 elementAt 文档如下 下面写一个下代码 firstElement Observable.just(1,2,3, ...
- RxJava 过滤操作符 throttleFirst 与 throttleLast 以及 sample
看文档发现 throttleFirst 与 throttleLast 以及 Sample 都跳到同一个界面Sample throttleFirst :在某段时间内,只发送该段时间内第1次事件(假如一个 ...
- RxJava 过滤操作符 distinct 和 distinctUntilChanged
distinct 看下文档 distinct : 过滤掉重复的元素 distinctUntilChanged: 过滤掉连续重复的元素,不连续重复的是不过滤 看下代码 1 distinct Obse ...
- RxJava 过滤操作符 take 与 takeLast
take 看下官方文档 take : 指定 观察者正序接受指定的items数量 takeLast 指定观察者正序接受最后指定的items的数量 看下demo take的代码 Observable.ju ...
- RxJava 过滤操作符skip 与 skipLast
skip 看下文档 skip 是正序跳过指定的items skipLast 是正序跳过指定最后几个items 下面看下代码 Observable.just(1,2,3,4,5,6).skip(1)// ...
- RxJava 变换操作符Map
看下文档如下 通过对每个项目应用函数来转换Observable发出的项目 个人理解为转换类型 下面写一个把int 类型转换为String 类型的demo Observable.create(new O ...
- RxJava 操作符 do
看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...
- RxJava debounce()和throttleWithTimeout()
官方地址:http://reactivex.io/documentation/operators/debounce.html debounce :防抖动 throttleWithTimeout:节流超 ...
- RxJava 解除订阅---------Disposable.dispose()方法
有时候我们需要解绑订阅,或者取消订阅, 这个时候就使用到了 Disposable.dispose()方法下面以一个案例说下使用方法 //Disposable.dispose()切断观察者 与 被观察者 ...
最新文章
- Win2K下关联进程/端口之代码初步分析
- 如何在防火墙或路由器中禁止访问一些公司不相关网站
- Pudb调试python
- 上一秒投简历下一秒被裁 ?小心,你的一举一动可能都在监控中
- Debian下使用Doxygen生成定制样式的开发文档
- ERP系统之JPJDE入门-1 JDE 简史
- linux系统能运行iis吗,Linux 下可以安装 IIS 吗
- linux centos用户修改密码,centos怎么修改用户密码
- mp4视频文件截图--h264解码成yuv再转存为bmp图片
- python输入若干个数字求和
- Hive开启WebUI
- 第十五届全国大学生智能汽车竞赛华南赛区获奖信息
- VUE + ONLYOFFICE
- gboard包名_如何在Android的Gboard键盘中搜索表情符号和GIF
- 用Python爬取豆瓣首页所有电影名称、每部电影影评及生成词云
- Openstack云平台的搭建与部署(具体实验过程截图评论拿)
- aux ps 和top_关于vmstat,top,ps aux查看的cpu占用率不一致的问题
- VSCode 环境配置管理
- Node.js - 自我总结
- 用迭代器遍历map 集合
热门文章
- 大华相机IP网段更新配置
- sidirect 连接西门子_如何配置DASSIDirect与西门子
- 假如你接近了“黑洞”,你很可能结束一切,也可能回到过去!
- java最新版下载地址
- 2022-2028年中国应急管理行业市场调查研究及发展前景展望报告
- python 储蓄计划_储蓄--投资恒式为什么不意味着计划的储蓄恒等于计划的投资?...
- 登录接口解析与接口测试用例
- Mac电脑隔空投递如何添加到菜单栏?
- 安全419《高级威胁检测与响应解决方案》系列访谈——未来智安(XDR SEC)篇
- linux与Windows下查看md5值