Kotlin之Flow由浅入深,对比Rxjava
原文链接
sequence
sequence又被称为惰性集合操作,下面举例说明
fun main() {val sequence = sequenceOf(1, 2, 3, 4)val result: Sequence<Int> = sequence.map { i ->println("Map $i")i * 2}.filter { i ->println("Filter $i")i % 3 == 0}println(result.first())// }
执行结果如下
Map 1 Filter 2 Map 2 Filter 4 Map 3 Filter 6 6
惰性的概念首先说是在
println(result.first())
之前,关于result的map和filter都不会执行,只有result被使用的时候才会执行,而且执行是惰性的,即map取出一个元素交给filter,而不是map对所有元素都处理过户再交给filter,而且当满足条件后就不会往下执行,由结果可以看出,没有对Sequence的4进行map和filter操作,因为3已经满足了条件而List是没有惰性的
fun main() {val sequence = listOf<Int>(1, 2, 3, 4)val result: List<Int> = sequence.map { i ->println("Map $i")i * 2}.filter { i ->println("Filter $i")i % 3 == 0}println(result.first()) }
执行结果
Map 1 Map 2 Map 3 Map 4 Filter 2 Filter 4 Filter 6 Filter 8 6
List是声明后立即执行,处理流程如下
- {1,2,3,4}->{2,4,6,8}
- 遍历判断是否能被3整除
由此可以总结出Sequence的优势
- 一旦满足遍历需要退出的条件,就可以省略后面不必要的遍历
- 像List这种实现Iterable接口的集合类,每调用一次函数就会生成一个新的Iterable,下一个函数再基于新的Iterable执行,每次函数调用产生的临时Iterable会导致额外的内存消耗,而Sequence在整个流程只有一个,且不改变原sequence
- 因此,Sequence这种数据类型可以在数据量较大或数据量未知的时候,作为流式处理的解决方案
但是由上也可以看出Squence是同步完成这些操作的,那有没有办法使用异步完成那些map和filter等操作符呢,答案就是Flow
Flow
如果我们把list转成Flow并在最后调用collect{},会产生一个编译错误,这是因为Flow是基于协程构建的,默认具有异步功能,因此你只能在协程里使用它,Flow也是
cold stream
,也就是直到你调用terminal operator
(比如collect{}),Flow才会执行,而且如果重复调用collect,则调用会得到同样的结果Flow提供了很多操作符,比如map,filter,scan,groupBy等,它们都是
cold stream
的,我们可以利用这些操作符完成异步代码。假如我们想使用map操作符来执行一些耗时任务(这里我们用延迟来模拟耗时),在Rxjava中你可以使用flatmap来进行一些逻辑,比如fun main() {val disposable = Observable.fromArray("A","Ba","ora","pear","fruit").flatMap {string->//flatMap是异步,map是同步println("flatMap $string")Observable.just(string).delay(1000,TimeUnit.MILLISECONDS).map { it.length }//这里是异步,对"A","Ba","ora","pear","fruit"异步执行转换操作,因此下面也不是顺序打印}.subscribe{print("subscribe:::")println(it)}Thread.sleep(10012) }
执行结果
flatMap A flatMap Ba flatMap ora flatMap pear flatMap fruit subscribe:::1 subscribe:::5 subscribe:::2 subscribe:::3 subscribe:::4
使用Flow完成类似上面的操作是这样的
fun main() {runBlocking {flowOf("A","Ba","ora","pear","fruit").map { stringToLength(it) }.collect { println(it) }//会依次打印 12345} } private suspend fun stringToLength(it:String):Int{delay(1000)return it.length }
terminal operator
上面提到collect()是terminal operator,意思就是仅当你调用它的时候才会去得到结果,和sequence使用的时候才会执行,Rxjava调用
subscribe
后才会执行,Flow中的terminal operator是suspend函数,其他的terminal operator有toList,toSet;first(),reduce(),flod()等取消 Cancellation
每次设置Rxjava订阅时,我们都必须考虑合适取消这些订阅,以免发生内存溢出或者,在生命周期结束后依然在后台执行任务(expired task working in background),调用subscribe后,Rxjava会s给我们返回一个Disposable对象,在需要时利用它的disposable方法可以取消订阅,如果你有多个订阅,你可以把这些订阅放在
CompositeDisposable
,并在需要的时候调用它的clear()方法或者dispose方法val compositeDisposable = CompositeDisposable() compositeDisposable.add(disposable) compositeDisposable.clear() compositeDisposable.dispose()
但是在协程作用内你完全不用考虑这些,因为只会在作用域内执行,作用域外会自动取消
Errors 处理异常
Rxjava最有用的功能之一是处理错误的方式,你可以在onError里处理所有的异常。同样Flow有类似的方法
catch{}
,如果你不使用此方法,那你的应用会抛出异常或者崩溃,你可以像之前一样使用try catch或者catch{}来处理错误,下面让我们来模拟一些错误fun main() {runBlocking {flowOfAnimeCharacters().map { stringToLength(it)}.collect { println(it) }} } private fun flowOfAnimeCharacters() = flow {emit("Madara")emit("Kakashi")//throwing some errorthrow IllegalStateException()emit("Jiraya")emit("Itachi")emit("Naruto") } private suspend fun stringToLength(it:String):Int{delay(1000)return it.length }
如果你运行了这个代码,那么程序显然会抛出异常并在控制台打印,下面我们分别使用上述的两种方式处理异常
fun main() {//使用try catchrunBlocking {try {flowOfAnimeCharacters().map { stringToLength(it)}.collect { println(it) }}catch (e:Exception){println(e.stackTrace)//虽然有异常,但是我们对异常做了处理,不会导致应用崩溃}finally {println("Beat it")}} //------------------------------------ @ExperimentalCoroutinesApi fun main() {//using catch{}runBlocking {flowOfAnimeCharacters().map { stringToLength(it)}.catch { println(it) }//不过这个好像还是实验性质的api,不在方法上使用注解会警告,并且catch{}必须凡在terminal operator之前.collect { println(it) }} }
resume 恢复
如果代码在执行过程中出现了异常,我们希望使用默认数据或者完整的备份来恢复数据流,在Rxjava中我们可以是使用 onErrorResumeNext()或者 onErrorReturn(),在Flow中我们依然可以使用catch{},但是我们需要在catch{}代码块里使用emit()来一个一个的发送备份数据,甚至如果我们愿意,可以使用emitAll()可以产生一个新的Flow,
@ExperimentalCoroutinesApi fun main() {//using catch{}runBlocking {flowOfAnimeCharacters().catch { emitAll(flowOf("Minato", "Hashirama")) }.collect { println(it) }}
现在你可以得到如下结果
Madara Kakashi Minato Hashirama
flowOn()
默认情况下Flow数据会运行在调用者的上下文(线程)中,如果你想随时切换线程比如像Rxjava的observeOn(),你可以使用flowOn()来改变上游的上下文,这里的上游是指调用flowOn之前的所有操作符,官方文档有很好的说明
/*** Changes the context where this flow is executed to the given [context].--改变Flow执行的上下文* This operator is composable and affects only preceding operators that do not have its own context.---这个操作符是可以多次使用的,它仅影响操作符之前没有自己上下文的操作符* This operator is context preserving: [context] **does not** leak into the downstream flow.--这个操作符指定的上下文不会污染到下游,它会保留默认的上下文,例如下面例子中最后的操作符single()使用的是默认的上下文而不是上游指定的Dispatchers.Default** For example:** ```* withContext(Dispatchers.Main) {* val singleValue = intFlow // will be executed on IO if context wasn't specified before* .map { ... } // Will be executed in IO* .flowOn(Dispatchers.IO)* .filter { ... } // Will be executed in Default* .flowOn(Dispatchers.Default)* .single() // Will be executed in the Main* }* ```
Completion
当Flow完成发送是数据时,无论是成功或者失败,你都想做一些事情,onCompletion()可以帮助你做到这一点:如下
@ExperimentalCoroutinesApi fun main() {//using catch{}runBlocking {flowOfAnimeCharacters().flowOn(Dispatchers.Default).catch {emitAll(flowOf("Minato", "Hashirama"))}.onCompletion {println("Done")it?.let { throwable -> println(throwable) }//这里由于上面处理了异常,所以这里就不会再有异常传递,这里自然也不会执行}.collect {println(it)}} } //catch{}的作用就是捕获异常并恢复新的Flow,这使得我们最终得到原始数据“Madara”, “Kakashi”和备份数据 “Minato”, “Hashirama”,捕获异常之后就是一份全新的没有异常的数据, onCompletion{…} and catch{…}都是mediator operators,他们时使用的顺序很重要
总结
我们使用Flow构建器创建了Flow,其中最基本的构建器是flowOf(),创建之后运行这个Flow需要使用terminal operators,由于terminal operator是suspend function ,因此我们需要在协程作用域内编写Flow代码,如果你不想使用这种嵌套调用而是链式调用,你可以使用 onEach{…}集合launchIn()。使用catch{}操作符来处理异常,并且当发生异常时也可以提供一个备份数据(如果你想这么做)。当上游的数据处理完或发生异常之后,使用onCompletion()来执行一些操作(感觉有点像finally)。。所有的操作符都会默认运行在调用函数的上下文中,可以使用flowOn()来切换上游的上下文
Kotlin之Flow由浅入深,对比Rxjava相关推荐
- Android Kotlin之Flow数据流
文章目录 Flow介绍 使用举例 常用操作符 创建操作符 回调操作符 变换操作符 过滤操作符 组合操作符 功能性操作符 末端操作符 冷流 vs 热流 SharedFlow shareIn将普通flow ...
- 【Kotlin协程】基于RxJava项目的Coroutine改造
最近,Android宣布彻底废弃AsyncTask,推荐Coroutine作为首选的异步编程方案. 如果说AsyncTask被Coroutine替代毫无悬念,那RxJava与Coroutine如何取舍 ...
- Kotlin与Java语法对比总结
文章目录 前言 一.变量 二.函数 三.程序的逻辑控制 1.条件语句 2.循环语句 四.面向对象编程 1.类与对象 2.继承 3.构造函数 4.接口 5.数据类与单例类 五.Lambda编程 1.集合 ...
- 【深入kotlin】 - Flow 进阶
Flow 上下文 Flow 的收集动作总是发生在调用协程的上下文当中,而非定义 Flow 的上下文. fun log(msg: String) = println("[${Thread.cu ...
- Android Kotlin Paging3 Flow完整教程
准备好接口 package com.example.android_learn_paging.netimport com.example.android_learn_paging.model.NetD ...
- RxJava VS kotlin flow
1.基础概念介绍 1.1 观察者模式 观察者模式,其实对于Android开发者而言,并不陌生,button的setOnClickListener,就是一个典型的观察者模式.控件button是被观察者, ...
- 线程切换哪家强?RxJava与Flow的操作符对比
作者:fundroid 链接:https://juejin.cn/post/6943037393893064734#heading-13 Flow作为Coroutine版的RxJava,同RxJava ...
- sharedpreferences使用方法_Google 推荐在 MVVM 架构中使用 Kotlin Flow
前言 在之前分享过一篇 Jetpack 综合实战应用 Jetpack 实战:神奇宝贝 ,这个项目主要包了以下功能: 自定义 RemoteMediator 实现 network + db 的混合使用 ( ...
- 大型Android项目架构:基于组件化+模块化+Kotlin+协程+Flow+Retrofit+Jetpack+MVVM架构实现WanAndroid客户端
前言:苟有恒,何必三更眠五更起:最无益,莫过一日曝十日寒. 前言 之前一直想写个 WanAndroid 项目来巩固自己对 Kotlin+Jetpack+协程 等知识的学习,但是一直没有时间.这里重新行 ...
最新文章
- 今晚开播 | 人脸识别的最新进展以及工业级大规模人脸识别实践探讨
- 剑指offer_第19题_顺时针打印矩阵_Python
- 转: object 和embed 标签播放flash
- VS Code 的常用快捷键和插件
- mysql delete返回值_Mybatis执行sql(insert、update、delete)返回值问题
- Android StateFlow详解
- 升级项目到.NET Core 2.0,在Linux上安装Docker,并成功部署
- HSSFCellStyle.ALIGN_CENTER报错
- 怎么在html的img src=src的值这里调用js方法或变量获取图片地址
- 众筹网站系统源码+手机端
- 以太坊2.0存款合约地址余额已突破50万ETH
- bitmap格式分析(转)
- 汉字计算机编码是谁发明的,神奇的汉字编码,了解一下
- 计算机专业电路基础高考试卷,计算机专业电路基础试题(4页)-原创力文档
- 宽带上行下行测试软件,上行网速(电信300m宽带上行下行)
- 最短路 POJ2387
- DT财经:2018北京城市大数据活跃报告
- 练习二:工作日天气预报
- 大事件!PCIe SSD与SATA SSD同价啦
- 路由与交换(一):路由概念及基本配置
热门文章
- Linux磁盘阵列(三分钟学会)
- 简单上手springcloud,eureka加fengin实现服务调用
- 学位论文中章标题与图表题注自动编号的新技巧
- python for循环写在一行_python开发如何将嵌套 for 循环写成单行?
- 怎么用计算机唱歌 百度网盘,【唱歌教程】唱歌技巧和发声方法初学者唱歌 百度云...
- 利用js写的见缝插针小游戏(推荐给新手)
- 牛客每日练习----逆序对,星图,小周的曲射炮
- 启示录:日本东京都二子玉川站TOD成功建设经验
- 手机APP调用支付宝支付(java服务端)
- 新一轮零食竞争开启:三只松鼠向左,良品铺子向右