Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing
从表面上来看,Stream代表一连串无穷数据元素。一连串的意思是元素有固定的排列顺序,所以对元素的运算也必须按照顺序来:完成了前面的运算再跟着进行下一个元素的运算。这样来看,Stream应该不是很好的并行运算工具。但是,fs2所支持的并行运算方式不是以数据元素而是以Stream为运算单位的:fs2支持多个Stream同时进行运算,如merge函数。所以fs2使Stream的并行运算成为了可能。
一般来说,我们可能在Stream的几个状态节点要求并行运算:
1、同时运算多个数据源头来产生不排序的数据元素
2、同时对获取的一连串数据元素进行处理,如:map(update),filter等等
3、同时将一连串数据元素无序存入终点(Sink)
我们可以创建一个例子来示范fs2的并行运算:模拟从3个文件中读取字串,然后统计在这3个文件中母音出现的次数。假设文件读取和母音统计是有任意时间延迟的(latency),我们看看如何进行并行运算及并行运算能有多少效率上的提升。我们先设定一些跟踪和模拟延迟的帮助函数:
1 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap { a => Task.delay{ println(s"$prompt>"); a }} 2 //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A] 3 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a => 4 val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) } 5 delay.flatMap {d => Task.now(a).schedule(d.millis) } 6 } //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.
log是个跟踪函数,randomDelay是个延迟模拟函数,模拟在max内的任意时间延迟。
与scalaz-stream-0.8不同,fs2重新实现了文件操作功能:不再依赖java的字串(string)处理功能。也不再依赖scodec的二进制数据转换功能。下面是fs2的文件读取方法示范:
1 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024) 2 //> s1 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>) 3 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024) 4 //> s2 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>) 5 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024) 6 //> s3 : fs2.Stream[fs2.Task,Byte] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>)
fs2.io.file.readAll函数的款式如下:
def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] ={...}
readAll分批(by chunks)从文件中读取Byte类型数据(当返回数据量小于chunkSize代表完成读取),返回结果类型是Stream[F,Byte]。我们需要进行Byte>>>String转换及分行等处理。fs2在text对象里提供了相关函数:
object text {private val utf8Charset = Charset.forName("UTF-8")/** Converts UTF-8 encoded byte stream to a stream of `String`. */def utf8Decode[F[_]]: Pipe[F, Byte, String] =_.chunks.through(utf8DecodeC)/** Converts UTF-8 encoded `Chunk[Byte]` inputs to `String`. */def utf8DecodeC[F[_]]: Pipe[F, Chunk[Byte], String] = {/*** Returns the number of continuation bytes if `b` is an ASCII byte or a* leading byte of a multi-byte sequence, and -1 otherwise.*/def continuationBytes(b: Byte): Int = {if ((b & 0x80) == 0x00) 0 // ASCII byteelse if ((b & 0xE0) == 0xC0) 1 // leading byte of a 2 byte seqelse if ((b & 0xF0) == 0xE0) 2 // leading byte of a 3 byte seqelse if ((b & 0xF8) == 0xF0) 3 // leading byte of a 4 byte seqelse -1 // continuation byte or garbage } ... /** Encodes a stream of `String` in to a stream of bytes using the UTF-8 charset. */def utf8Encode[F[_]]: Pipe[F, String, Byte] =_.flatMap(s => Stream.chunk(Chunk.bytes(s.getBytes(utf8Charset))))/** Encodes a stream of `String` in to a stream of `Chunk[Byte]` using the UTF-8 charset. */def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] =_.map(s => Chunk.bytes(s.getBytes(utf8Charset)))/** Transforms a stream of `String` such that each emitted `String` is a line from the input. */def lines[F[_]]: Pipe[F, String, String] = { ...
utf8Encode,utf8Decode,lines这几个函数正是我们需要的,它们都是Pipe类型。我们可以把这几个Pipe直接用through接到Stream上:
1 val startTime = System.currentTimeMillis //> startTime : Long = 1472444756321 2 val s1lines = s1.through(text.utf8Decode).through(text.lines) 3 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 4 //> s1lines : Int = 479 5 println(s"reading s1 $s1lines lines in ${System.currentTimeMillis - startTime}ms") 6 //> reading s1 479 lines in 5370ms 7 8 val startTime2 = System.currentTimeMillis //> startTime2 : Long = 1472444761691 9 val s2lines = s2.through(text.utf8Decode).through(text.lines) 10 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 11 //> s2lines : Int = 174 12 println(s"reading s2 $s2lines lines in ${System.currentTimeMillis - startTime2}ms") 13 //> reading s2 174 lines in 1923ms 14 val startTime3 = System.currentTimeMillis //> startTime3 : Long = 1472444763614 15 val s3lines = s3.through(text.utf8Decode).through(text.lines) 16 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 17 //> s3lines : Int = 174 18 println(s"reading s3 $s3lines lines in ${System.currentTimeMillis - startTime3}ms") 19 //> reading s3 174 lines in 1928ms 20 println(s"reading all three files ${s1lines+s2lines+s3lines} total lines in ${System.currentTimeMillis - startTime}ms") 21 //> reading all three files 827 total lines in 9221ms
在以上的例子里我们用runFold函数统计文件的文字行数并在读取过程中用randomDelay来制造了随意长度的拖延。上面3个文件的字串读取和转换处理一共877行、9221ms。
我们知道fs2的并行运算函数concurrent.join函数类型款式是这样的:
def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}
join运算的对象outer是个两层Stream(Streams of Stream):Stream[F,Stream[F,P]],我们需要先进行类型款式调整:
1 val lines1 = s1.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 2 //> lines1 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>) 3 val lines2 = s2.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 4 //> lines2 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>) 5 val lines3 = s3.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 6 //> lines3 : fs2.Stream[fs2.Task,String] = evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>) 7 val ss: Stream[Task,Stream[Task,String]] = Stream(lines1,lines2,lines3) 8 //> ss : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,String]] = Segment(Emit(Chunk(evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>), evalScope(Scope(Bind(Eval(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>))))
现在这个ss的类型复合我们的要求。我们可以测试一下并行运算的效率:
1 val ss_start = System.currentTimeMillis //> ss_start : Long = 1472449962698 2 val ss_lines = fs2.concurrent.join(3)(ss).runFold(0)((b,_) => b + 1).unsafeRun 3 //> ss_lines : Int = 827 4 println(s"parallel reading all files ${ss_lines} total lines in ${System.currentTimeMillis - ss_start}ms") 5 //> parallel reading all files 827 total lines in 5173ms
读取同等行数但只用了5173ms,与之前的9221ms相比,大约有成倍的提速。
join(3)(ss)返回了一个合并的Stream,类型是Stream[Task,String]。我们可以运算这个Stream里母音出现的频率。我们先设计这个统计函数:
1 //c 是个vowl 2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c) 3 //> vowls: (c: Char)Boolean 4 //直接用scala标准库实现 5 def pipeVowlsCount: Pipe[Task,String,Map[Char,Int]] = 6 _.evalMap (text => Task.delay{ 7 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 8 }.schedule((text.length / 10).millis)) //> pipeVowlsCount: => fs2.Pipe[fs2.Task,String,Map[Char,Int]]
注意我们使用了text => Task.delay{...}.schedule(d),实际上我们完全可以用 text => Thread.sleep(d),但是这样会造成了不纯代码,所以我们用evalMap来实现纯代码运算。试试统计全部字串内母音出现的总数:
1 import scalaz.{Monoid} 2 //为runFold提供一个Map[Char,Int]Monoid实例 3 implicit object mapMonoid extends Monoid[Map[Char,Int]] { 4 def zero: Map[Char,Int] = Map() 5 def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = { 6 (m1.keySet ++ m2.keySet).map { k => 7 (k, m1.getOrElse(k,0) + m2.getOrElse(k,0)) 8 }.toMap 9 } 10 } 11 val vc_start = System.currentTimeMillis //> vc_start : Long = 1472464772465 12 val vowlsLine = fs2.concurrent.join(3)(ss).through(pipeVowlsCount) 13 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 14 //> vowlsLine : scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U - 838, A -> 2361, I -> 2031, O -> 1824) 15 println(s"parallel reading all files and counted vowls sequencially in ${System.currentTimeMillis - vc_start}ms") 16 //> parallel reading all files and counted vowls sequencially in 10466ms
我们必须为runFold提供一个Monoid[Map[Char,Int]]实例mapMonoid。
那我们又如何实现统计功能的并行运算呢? fs2.concurrent.join(maxOpen)(...)函数能把一个Stream截成maxOpen数的子Stream,然后对这些子Stream进行并行运算。那么我们又如何转换Stream[F,Stream[F,O]]类型呢?我们必须把Stream[F,O]的O升格成Stream[F,O]。我们先用一个函数来把O转换成Map[Char,Int],然后把这个函数升格成Stream[Task,Map[Char,Int],这个可以用Stream.eval实现:
1 def fVowlsCount(text: String): Map[Char,Int] = 2 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 3 //> fVowlsCount: (text: String)Map[Char,Int] 4 val parVowlsLine: Stream[Task,Stream[Task,Map[Char,Int]]] = fs2.concurrent.join(3)(ss) 5 .map {text => Stream.eval(Task {fVowlsCount(text)}.schedule((text.length / 10).millis))} 6 //> parVowlsLine : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Map[Char,Int]]] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>).mapChunks(<function1>)
我们来检查一下运行效率:
1 val parvc_start = System.currentTimeMillis //> parvc_start : Long = 1472465844694 2 fs2.concurrent.join(8)(parVowlsLine) 3 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 4 //> res0: scala.collection.immutable.Map[Char,Int] = Map(E -> 3381, U -> 838, A-> 2361, I -> 2031, O -> 1824) 5 println(s"parallel reading all files and counted vowls in ${System.currentTimeMillis - parvc_start}ms") 6 //> parallel reading all files and counted vowls in 4984ms
并行运算只需要4985ms,而流程运算需要10466+(9221-5173)=14xxx,这里有3,4倍的速度提升。
下面是这次讨论的示范源代码:
1 import fs2._ 2 import scala.language.{higherKinds,implicitConversions,postfixOps} 3 import scala.concurrent.duration._ 4 object fs2Merge { 5 implicit val strategy = Strategy.fromFixedDaemonPool(4) 6 implicit val scheduler = Scheduler.fromFixedDaemonPool(2) 7 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap { a => Task.delay{ println(s"$prompt>"); a }} 8 def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.evalMap { a => 9 val delay: Task[Int] = Task.delay { scala.util.Random.nextInt(max.toMillis.toInt) } 10 delay.flatMap {d => Task.now(a).schedule(d.millis) } 11 } 12 13 val s1 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicBackend.scala"),1024) 14 val s2 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/DatabaseConfig.scala"),1024) 15 val s3 = io.file.readAll[Task](java.nio.file.Paths.get("/Users/tiger-macpro/basic/BasicProfile.scala"),1024) 16 17 18 val startTime = System.currentTimeMillis 19 val s1lines = s1.through(text.utf8Decode).through(text.lines) 20 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 21 println(s"reading s1 $s1lines lines in ${System.currentTimeMillis - startTime}ms") 22 23 val startTime2 = System.currentTimeMillis 24 val s2lines = s2.through(text.utf8Decode).through(text.lines) 25 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 26 println(s"reading s2 $s2lines lines in ${System.currentTimeMillis - startTime2}ms") 27 val startTime3 = System.currentTimeMillis 28 val s3lines = s3.through(text.utf8Decode).through(text.lines) 29 .through(randomDelay(10 millis)).runFold(0)((b,_) => b + 1).unsafeRun 30 println(s"reading s3 $s3lines lines in ${System.currentTimeMillis - startTime3}ms") 31 println(s"reading all three files ${s1lines+s2lines+s3lines} total lines in ${System.currentTimeMillis - startTime}ms") 32 val lines1 = s1.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 33 val lines2 = s2.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 34 val lines3 = s3.through(text.utf8Decode).through(text.lines).through(randomDelay(10 millis)) 35 val ss: Stream[Task,Stream[Task,String]] = Stream(lines1,lines2,lines3) 36 val ss_start = System.currentTimeMillis 37 val ss_lines = fs2.concurrent.join(3)(ss).runFold(0)((b,_) => b + 1).unsafeRun 38 println(s"parallel reading all files ${ss_lines} total lines in ${System.currentTimeMillis - ss_start}ms") 39 40 //c 是个vowl 41 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c) 42 //直接用scala标准库实现 43 def pipeVowlsCount: Pipe[Task,String,Map[Char,Int]] = 44 _.evalMap (text => Task.delay{ 45 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 46 }.schedule((text.length / 10).millis)) 47 48 import scalaz.{Monoid} 49 //为runFold提供一个Map[Char,Int]Monoid实例 50 implicit object mapMonoid extends Monoid[Map[Char,Int]] { 51 def zero: Map[Char,Int] = Map() 52 def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = { 53 (m1.keySet ++ m2.keySet).map { k => 54 (k, m1.getOrElse(k,0) + m2.getOrElse(k,0)) 55 }.toMap 56 } 57 } 58 val vc_start = System.currentTimeMillis 59 val vowlsLine = fs2.concurrent.join(3)(ss).through(pipeVowlsCount) 60 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 61 println(s"parallel reading all files and counted vowls sequencially in ${System.currentTimeMillis - vc_start}ms") 62 def fVowlsCount(text: String): Map[Char,Int] = 63 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 64 val parVowlsLine: Stream[Task,Stream[Task,Map[Char,Int]]] = fs2.concurrent.join(3)(ss) 65 .map {text => Stream.eval(Task {fVowlsCount(text)}.schedule((text.length / 10).millis))} 66 val parvc_start = System.currentTimeMillis 67 fs2.concurrent.join(8)(parVowlsLine) 68 .runFold(Map[Char,Int]())(mapMonoid.append(_,_)).unsafeRun 69 println(s"parallel reading all files and counted vowls in ${System.currentTimeMillis - parvc_start}ms") 70 }
转载于:https://www.cnblogs.com/tiger-xc/p/5820446.html
Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing相关推荐
- Scalaz(23)- 泛函数据结构: Zipper-游标定位
外面沙尘滚滚一直向北去了,意识到年关到了,码农们都回乡过年去了,而我却留在这里玩弄"拉链".不要想歪了,我说的不是裤裆拉链而是scalaz Zipper,一种泛函数据结构游标(cu ...
- FunDA(7)- Reactive Streams to fs2 Pull Streams
Reactive-Stream不只是简单的push-model-stream, 它还带有"拖式"(pull-model)性质.这是因为在Iteratee模式里虽然理论上由Enume ...
- Java 8中Stream API的这些奇技淫巧!你都Get到了吗?
作者:我是你的小眼睛儿 https://www.jianshu.com/p/9fe8632d0bc2 Stream简介 1.Java 8引入了全新的Stream API.这里的Stream和I/O流不 ...
- Java 8 中 Stream API 的奇技淫巧
点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 作者 | 我是你的小眼睛儿 来源 | jianshu.c ...
- 【Java8精华教程】一起爪哇Java8——好用的Stream
2019独角兽企业重金招聘Python工程师标准>>> Stream组成 在传统Java编程,或者说是类C语言编程中,我们如何操作一个数组数据呢?或者更泛化的讲,我们如何操作一个&q ...
- Java8新特性总结 -5.Stream API函数式操作流元素集合
所有示例代码打包下载 : 点击打开链接 Java8新特性 : 接口新增默认方法和静态方法 Optional类 Lambda表达式 方法引用 Stream API - 函数式操作流元素集合 Date/T ...
- webflux 之 Stream流
Stream流的创建 import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.u ...
- java8 stream运行原理之并行流原理详解
上一篇文章<java8 stream运行原理之顺序流原理详解>介绍了顺序流的执行原理,本文接着上一篇介绍并行流的执行原理. 一.如何创建并行流 调用parallel()方法可以创建并行流, ...
- JDK8新特性:Lambda表达式、Stream流、日期时间工具类
重要特性: 可选类型声明:不需要声明参数类型,编译器可以统一识别参数值. 可选的参数圆括号:一个参数无需定义圆括号,但多个参数需要定义圆括号. 可选的大括号:如果主体包含了一个语句,就不需要大括号. ...
最新文章
- CVPR 2020目标跟踪多篇开源论文(下)
- 自动驾驶关键技术报告:惯性导航和背后的芯片大战
- node搭建的一个应用在前端项目中的可切换接口的代理服务器
- vector 不是模板
- python编程在哪里写程序-第一个Python程序——在屏幕上输出文本
- C++向量 vector动态数组
- 【百战GAN】如何使用GAN拯救你的低分辨率老照片
- python生成随机数代码_Python中产生随机数
- C 20 协程初探
- python-上传下载文件
- matlab模拟退火最小球覆盖,【模板】模拟退火 费马点以及最小球覆盖
- 设计导航网站|解决寻找合适的字体麻烦
- matplotlib 颜色板
- @程序员,React 使用如何避坑?
- 拆分是解决大规模应用问题的本质
- Shell脚本编程30分钟入门学习
- 拒绝精神内耗,5个适合中年人的自学网站,让你脱胎换骨
- C语言数据结构,排序的基本操作。
- 写代码写文章勿有功利心
- react中使用ref获取