2. 运算

2.1. StreamingContext.checkPoint

    val ssc = new StreamingContext(sc, Seconds(10))val checkpointDir = "hdfs://dir/checkpoint"ssc.checkpoint(checkpointDir)

1、为Spark Streaming设置checkpoint,在使用DStream.updateByKey前必须先设置checkpoint
2、在checkpointDir目录下会生成形如checkpoint-1454403510000和checkpoint-1454403510000.bk的文件和1f355a78-5404-4b47-9755-8529d15e9037的文件夹
3、checkpoint-1454403510000和checkpoint-1454403510000.bk内容一样,很明显1454403510000是时间戳,打开可以看见形如update hdfs://dir/1f355a78-5404-4b47-9755-8529d15e9037/rdd-87的语句,说明把当前状态存入了rdd-87中

2.2. DStream.updateByKey

    val streamPath = "hdfs://dir/1"val words: DStream[(String, Int)] = ssc.textFileStream(streamPath).flatMap(_.split(" ")).map(x => (x, 1))//为每个word转换为(word,1)的tupleval wordCounts = words.updateStateByKey[Int](updateFunc = (sumNow: Seq[Int], sumLast: Option[Int]) => {Some(sumNow.sum + sumLast.getOrElse(0))})wordCounts.print()ssc.start()ssc.awaitTermination()

updateByKey的官网解释
1、updateByKey是PairDStreamFunctions的算子,必须是DStream[(Key, Value)]才能使用。
2、updateByKey最长接受的参数如下:

    def updateStateByKey[S](updateFunc: (Iterator[(K, Seq[V], Option[S])]) ⇒ Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean, initialRDD: RDD[(K, S)])(implicit arg0: ClassTag[S]): DStream[(K, S)]

最短的如下:

    def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) ⇒ Option[S])(implicit arg0: ClassTag[S]): DStream[(K, S)]

其中最重要的是updateFunc: (Seq[V], Option[S]) ⇒ Option[S]),它来说明如何更新状态(state)

    val wordCounts = words.updateStateByKey[Int](updateFunc = (sumNow: Seq[Int], sumLast: Option[Int]) => {Some(sumNow.sum + sumLast.getOrElse(0))})

1、假设words中是形如DStream((“hello”,1),(“hello”,1),(“hello”,1),(“hello”,1),(“hello”,1),(“spark”,1),(“spark”,1))
2、对于Key = “hello”来说updateFunc的输入参数sumNow就是Seq(1,1,1,1,1),sumLast是上个状态”hello”的Value值——空值;输出就是本状态”hello”的Value值——Some(1+1+1+1+1+0) = Some(5)。
3、最终wordCounts就是形如DStream((“hello”,5),(“spark”,2))

2.2.1. 期望只统计最近2个state的数据

val input = ssc.textFileStream(streamPath)
val words = input.flatMap(_.split(" ")).map(x => (x, Array(1,0)))val wordCounts = words.updateStateByKey[Array[Int]](updateFunc = (countNow: Seq[Array[Int]], stateLast: Option[Array[Int]]) => {val sumNow: Int = countNow.map(_(0)).sumval sumLast: Int = stateLast.getOrElse(Array(0,0))(0)Some(Array(sumNow,sumLast))})wordCounts.map(x => (x._1, x._2.sum)).print()

2.2.2. 期望定义state’有新文件才生成新的state’,而不是目前每10秒一个新的state

1、尚未发现有API支持这一点
2、尚未发现有方法判定当前DStream为空的API
3、尚未发现有API可将DStream => 除了DStream、Unit和StreamingContext之外的数据结构
4、尝试采用如下语句判定:

    var flag = trueval input = ssc.textFileStream(streamPath)input.foreachRDD(x => {if (x.isEmpty()) flag = false else flag = true})val words = input.flatMap(_.split(" ")).map(x => (x, Array(1,0)))val wordCounts = words.updateStateByKey[Array[Int]](updateFunc = (countNow: Seq[Array[Int]], stateLast: Option[Array[Int]]) => {if (flag) {val sumNow: Int = countNow.map(_(0)).sumval sumLast: Int = stateLast.getOrElse(Array(0,0))(0)Some(Array(sumNow,sumLast))} else {stateLast}})wordCounts.map(x => (x._1, x._2.sum)).print()

结果Spark-shell中报错:
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
2016.2.17日更新:Spark-submit成功!

2.3. DStream.reduceByKeyAndWindow

业务场景:
1、约每5分钟生成一个源文件放在hdfs://dir中,该文件代表1分钟内收到的所有单词
2、统计当前所有单词的出现次数
3、该单词如果40分钟内未出现,既40个文件中未出现则认为消失,不出现在统计结果中
4、要求每10秒扫描一次文件夹
由于2.2.2的问题没有解决,也就是说Spark Streaming每10秒一个state,而不是每个新文件一个state。由于累加结果有40分钟过期时间,因此无法用单纯的updateByKey算子来计算。尝试采用DStream.reduceByKeyAndWindow算子来替代。
采用2.2.2的方式实现

Spark Streaming学习与实践(2)相关推荐

  1. Spark学习笔记(8)---Spark Streaming学习笔记

    Spark Streaming学习笔记 同Spark SQL一样,Spark Streaming学习也是放在了github https://github.com/yangtong123/RoadOfS ...

  2. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

  3. pythonspark实践_基于Python的Spark Streaming Kafka编程实践

    版权声明:本文为CSDN博主原创文章,未经博主允许不得转载. 说明 Spark Streaming的原理说明的文章很多,这里不做介绍.本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些 ...

  4. Spark Streaming学习笔记

    特点: Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性. Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Tw ...

  5. Spark Streaming实践和优化

    2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming实践和优化 博客分类: spark 在流式计算领域,Spark Streaming和Storm时 ...

  6. Spark Streaming高级特性在NDCG计算实践

    从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方 ...

  7. StreamDM:基于Spark Streaming、支持在线学习的流式分析算法引擎

    StreamDM:基于Spark Streaming.支持在线学习的流式分析算法引擎 streamDM:Data Mining for Spark Streaming,华为诺亚方舟实验室开源了业界第一 ...

  8. Spark学习之Spark Streaming(9)

    Spark学习之Spark Streaming(9) 1. Spark Streaming允许用户使用一套和批处理非常接近的API来编写流式计算应用,这就可以大量重用批处理应用的技术甚至代码. 2. ...

  9. 大数据学习15之spark streaming入门

    文章目录 一.概述 二.应用场景 三.集成Spark生态系统的使用 四.发展史 五.从词频统计功能着手入门 1.spark-submit执行 2.spark-shell执行(测试时使用) 六.工作原理 ...

最新文章

  1. Android 中Message,MessageQueue,Looper,Handler详解+实例
  2. java ranger rest_kafka ranger integration issuse
  3. 【LeetCode笔记】3. 无重复字符的最长子串(JAVA、滑动窗口、字符串)
  4. linux图形环境小记
  5. 手机老是应用无响应是怎么回事?
  6. 苹果发布 Safari 技术预览版 131,其中包含错误修复和性能改进
  7. flash mx拖拽实例_Flash MX 2004中的像素溶解效果
  8. 开发的免费Windows 8 应用程序
  9. java 认证 种类_java认证:JavaSocket编程的一个秘密类
  10. 电竞帮服务器未响应,电竞显示器ips面板1ms响应时间?被蒙骗是因为你还不了解MPRT技术!...
  11. 马云:眼光有多远 未来就有多远【2014世界互联网大会】
  12. LeetCode781森林中的兔子题解
  13. Java后台获取USB二维码扫描枪内容(Java监听系统键盘操作)
  14. 消灭星星android,消灭星星安卓版
  15. 光年(Light Year Admin)后台管理系统模板
  16. 『注册中心』Consul微服务注册中心的使用及相关集群搭建
  17. 【404 App】2.0全新版本正式来袭之ALL模块。
  18. Linux | 第一篇——常见指令汇总【超全、超详细讲解】
  19. Unity旋转之四元数(开关车门,第一人称控制器)
  20. 爬虫配套学习-前端学习笔记04-表格

热门文章

  1. c语言程序设计教程 传智,c语言程序设计教程传智播客 答案
  2. 面试题: Docker的优缺点
  3. windows 捕获 扬声器声音 pcm_资历深厚专业声音设计师这样评价 Zoom F6 多轨影视级录音机...
  4. 心动页面html,心动模式播放页.html
  5. pytest-fixture应用
  6. 把NT“赶尽杀绝”攻击NT的一些技术(转)
  7. Python异步编程详解
  8. ubuntu安装gcc-7,g++-7,源码安装python
  9. 青提WIFI代理系统开发
  10. 还在用print()查找错误?日志消息这顿排骨它不香嘛?