一、SparkStreaming概述

Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:

对数据的操作也是按照RDD为单位来进行的

计算过程由Spark engine来完成

二、SparkStreaming的简单实例

实例①:截取端口数据

nc -lk 8888 开启linux的8888端口输入数据,DStream获取输出

  1. //StreamingContext
  2. val conf =newSparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
  3. val sc =newSparkContext(conf)
  4. val ssc =newStreamingContext(sc,Seconds(5))
  5. //接收数据
  6. val ds = ssc.socketTextStream("server",8888)//从socket端口拉取数据;返回的是DStream 离散化数据流
  7. //DStream是一个特殊的RDD
  8. //hello tom hello jerry
  9.  
  10. //ds即是获取的该端口的数据
  11. val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)//操作的是该批次batches,该批次累计加
  12. //打印结果
  13.  
  14. result.print()
  15. ssc.start()
  16. ssc.awaitTermination()
实例②:截取端口数据(含历史数据)累加计算
  1. val updateFunc =(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
  2. iter.map{case(word, current_count, history_count)=>(word, current_count.sum + history_count.getOrElse(0))}
  3. }
  4. def main(args:Array[String]){
  5. LoggerLevels.setStreamingLogLevels()
  6. //StreamingContext
  7. val conf =newSparkConf().setAppName("StateFulWordCount").setMaster("local[2]")
  8. val sc =newSparkContext(conf)
  9. //updateStateByKey必须设置setCheckpointDir
  10. sc.setCheckpointDir("c://ck")
  11. val ssc =newStreamingContext(sc,Seconds(5))
  12. val ds = ssc.socketTextStream("192.168.137.10",8888)
  13. //DStream是一个特殊的RDD
  14. //hello tom hello jerry
  15. val result = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,newHashPartitioner(sc.defaultParallelism),true)
  16. result.print()
  17. ssc.start()
  18. ssc.awaitTermination()
  19. }
updateStateByKey 解释: 
以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加 
在有新的数据信息进入或更新时,可以让用户保持想要的任何状。使用这个功能需要完成两步: 
1) 定义状态:可以是任意数据类型 
2) 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。 updateFunc
对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大。

实例③sparkStreaming结合flume

  1. val host = args(0)
  2. val port = args(1).toInt
  3. LoggerLevels.setStreamingLogLevels()
  4. val conf =newSparkConf().setAppName("FlumeWordCount").setMaster("local[2]")//!!!选择是否本地,本地的时候带有local,集群填写集群的地址
  5. val ssc =newStreamingContext(conf,Seconds(5))
  6. //推送方式: flume向spark发送数据
  7. val flumeStream =FlumeUtils.createStream(ssc, host, port)//集群中flume设置了数据的输出端口
  8. //flume中的数据通过event.getBody()才能拿到真正的内容
  9. val words = flumeStream.flatMap(x =>newString(x.event.getBody().array()).split(" ")).map((_,1))
  10. val results = words.reduceByKey(_ + _)
  11. results.print()
  12. ssc.start()
  13. ssc.awaitTermination()
linux上flume设置文件夹监听,一方面写入数据,监测数据的输出
flume的sink设置IP和端口,要和创建的flumeStream流端口一致

备注:课时出错,setMaster()的设置集群还是本地模式出错要设置明确是集群还是本地

拉取数据与推送数据区别:flume创建的流不同

采用推模式:推模式的理解就是Flume作为缓存,存有数据。监听对应端口,如果服务可以链接,就将数据push过去。(简单,耦合要低),缺点是SparkStreaming 程序没有启动的话,Flume端会报错,同时可能会导致Spark Streaming 程序来不及消费的情况。flume主动推送,SparkStreaming被动接收

采用拉模式:拉模式就是自己定义一个sink,SparkStreaming自己去channel里面取数据,根据自身条件去获取数据,稳定性好。SparkStreaming主动去拉去

实例④

  1. val updateFunc =(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
  2. //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
  3. iter.flatMap {case(x, y, z)=>Some(y.sum + z.getOrElse(0)).map(i =>(x, i))}
  4. }
  5. def main(args:Array[String]){
  6. LoggerLevels.setStreamingLogLevels()
  7. val Array(zkQuorum, group, topics, numThreads)= args
  8. val sparkConf =newSparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
  9. val ssc =newStreamingContext(sparkConf,Seconds(5))
  10. ssc.checkpoint("c://ck2")//sparkStreaming的设置checkpoint与sparkContext不同
  11. //获取该topic下的数据
  12. val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
  13.  //获取流
  14. val data =KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,StorageLevel.MEMORY_AND_DISK_SER)
  15.  
  16. val words = data.map(_._2).flatMap(_.split(" "))
  17. val WordCounts= words.map((_,1)).updateStateByKey(updateFunc,newHashPartitioner(ssc.sparkContext.defaultParallelism),true)
  18. WordCounts.print()
  19. ssc.start()
  20. ssc.awaitTermination()
  21. }

备注:Kafka和Flume两个框架和SparkStreaming来进行实时数据流通,都是通过内部创建流

val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
val flumeStream =FlumeUtils.createStream(ssc, host, port)

Spark的实时计算用SparkStreaming;离线计算用SparkContext。入参都有conf,一个含延时,另一个不含延时

补充:设置输出日志信息

只需在该main方法中执行:LoggerLevels.setStreamingLogLevels()

设置后,输出的信息,只有打印出来的数据信息

  1. object LoggerLevelsextendsLogging{
  2. def setStreamingLogLevels(){
  3. val log4jInitialized =Logger.getRootLogger.getAllAppenders.hasMoreElements
  4. if(!log4jInitialized){
  5. logInfo("Setting log level to [WARN] for streaming example."+
  6. " To override add a custom log4j.properties to the classpath.")
  7. Logger.getRootLogger.setLevel(Level.WARN)
  8. }
  9. }
  10. }

D32 SparkStreaming相关推荐

  1. 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用

    目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 ​​​​​​​API 注意 ​​​​​​​代码实现-自动提交偏移量到默认主题 ​​​​​​​代码实现- ...

  2. 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明

    目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...

  3. 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD

    目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...

  4. 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...

  5. 2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey

    目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例 ...

  6. 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

    目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...

  7. 2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream

    目录 SparkStreaming数据抽象-DStream DStream 是什么 DStream Operations Transformation Output函数 SparkStreaming数 ...

  8. 第3课:SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错...

    本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming容错架构和运行机制 理解SparkStreaming的Job的整个架构和运行机制对于精通Spar ...

  9. SparkStreaming靠什么征服了字节跳动?

    随着信息技术的迅猛发展以及数据量的爆炸式增长,数据的种类与变化速度促使人们对大数据处理提出了更高的要求.但传统的批处理框架却一直难以满足各个领域中的实时性需求. Spark--实现大数据的流式处理 S ...

最新文章

  1. html5简单拖拽实现自动左右贴边+幸运大转盘
  2. NEFU 1146 又见A+B
  3. NOJ——1672剪绳子(博弈)
  4. 经典面试题:用typeof来判断对象的潜在陷阱
  5. 近期计算机视觉相关算法竞赛汇总—总奖池超553万人民币
  6. 折半查找的思想及源码_二分查找及对应的几道经典题目
  7. 【对讲机的那点事】解读无管局《回答》:充分理解物联网产业诉求,值得点赞!...
  8. android DisplayMetrics
  9. 软件测试 - V模型、W模型、H模型、X模型
  10. Android viewpager + fragment实现fragment之间的切换
  11. 阿里云宗志刚:云网一体,新一代洛神云网络平台
  12. 互联网金融一:大额支付系统、小额支付系统介绍
  13. ALSA 音频工具 amixer、aplay、arecord
  14. 播布客学习视频_C学习笔记_simple
  15. 输入一个数字n,输出一个n层的特定三角形
  16. 自然语言处理--MM、RMM算法及Python 复习
  17. ServU 5.0的配置
  18. 20170909深度学习solar测试日志
  19. 想起了三联书店 [戴文葆]
  20. Mac 系统不兼容移动硬盘无法识别怎么办

热门文章

  1. 最新多传感器融合基准 | Argoverse 2:用于感知和预测的下一代数据集
  2. 同相比造句_对比造句_造句大全
  3. 微信小程序开发从入门到精通
  4. 用心整理值得收藏的30道Python练手题(附详细答案)
  5. GNU Global 教程
  6. Java实现石头剪刀布游戏
  7. CSS 揭秘-阅读笔记:(Ch5-Ch6)
  8. 树莓派02 ------ 内核编译、移植
  9. Verilog系统函数—随机数产生
  10. iOS应用内支付(IAP)的注意事项