D32 SparkStreaming
一、SparkStreaming概述
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
对数据的操作也是按照RDD为单位来进行的
计算过程由Spark engine来完成
二、SparkStreaming的简单实例
实例①:截取端口数据
nc -lk 8888 开启linux的8888端口输入数据,DStream获取输出
//StreamingContext
val conf =newSparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
val sc =newSparkContext(conf)
val ssc =newStreamingContext(sc,Seconds(5))
//接收数据
val ds = ssc.socketTextStream("server",8888)//从socket端口拉取数据;返回的是DStream 离散化数据流
//DStream是一个特殊的RDD
//hello tom hello jerry
//ds即是获取的该端口的数据
val result = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)//操作的是该批次batches,该批次累计加
//打印结果
result.print()
ssc.start()
ssc.awaitTermination()
val updateFunc =(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.map{case(word, current_count, history_count)=>(word, current_count.sum + history_count.getOrElse(0))}
}
def main(args:Array[String]){
LoggerLevels.setStreamingLogLevels()
//StreamingContext
val conf =newSparkConf().setAppName("StateFulWordCount").setMaster("local[2]")
val sc =newSparkContext(conf)
//updateStateByKey必须设置setCheckpointDir
sc.setCheckpointDir("c://ck")
- val ssc =newStreamingContext(sc,Seconds(5))
- val ds = ssc.socketTextStream("192.168.137.10",8888)
//DStream是一个特殊的RDD
//hello tom hello jerry
val result = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,newHashPartitioner(sc.defaultParallelism),true)
- result.print()
- ssc.start()
- ssc.awaitTermination()
}
以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加
在有新的数据信息进入或更新时,可以让用户保持想要的任何状。使用这个功能需要完成两步:
1) 定义状态:可以是任意数据类型
2) 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。 updateFunc
对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大。
实例③sparkStreaming结合flume
val host = args(0)
val port = args(1).toInt
LoggerLevels.setStreamingLogLevels()
val conf =newSparkConf().setAppName("FlumeWordCount").setMaster("local[2]")//!!!选择是否本地,本地的时候带有local,集群填写集群的地址
val ssc =newStreamingContext(conf,Seconds(5))
//推送方式: flume向spark发送数据
val flumeStream =FlumeUtils.createStream(ssc, host, port)//集群中flume设置了数据的输出端口
//flume中的数据通过event.getBody()才能拿到真正的内容
val words = flumeStream.flatMap(x =>newString(x.event.getBody().array()).split(" ")).map((_,1))
val results = words.reduceByKey(_ + _)
results.print()
ssc.start()
ssc.awaitTermination()
备注:课时出错,setMaster()的设置集群还是本地模式出错要设置明确是集群还是本地
拉取数据与推送数据区别:flume创建的流不同
采用推模式:推模式的理解就是Flume作为缓存,存有数据。监听对应端口,如果服务可以链接,就将数据push过去。(简单,耦合要低),缺点是SparkStreaming 程序没有启动的话,Flume端会报错,同时可能会导致Spark Streaming 程序来不及消费的情况。flume主动推送,SparkStreaming被动接收
采用拉模式:拉模式就是自己定义一个sink,SparkStreaming自己去channel里面取数据,根据自身条件去获取数据,稳定性好。SparkStreaming主动去拉去
实例④
val updateFunc =(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap {case(x, y, z)=>Some(y.sum + z.getOrElse(0)).map(i =>(x, i))}
}
def main(args:Array[String]){
LoggerLevels.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads)= args
val sparkConf =newSparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc =newStreamingContext(sparkConf,Seconds(5))
ssc.checkpoint("c://ck2")//sparkStreaming的设置checkpoint与sparkContext不同
- //获取该topic下的数据
- val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//获取流
val data =KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,StorageLevel.MEMORY_AND_DISK_SER)
val words = data.map(_._2).flatMap(_.split(" "))
val WordCounts= words.map((_,1)).updateStateByKey(updateFunc,newHashPartitioner(ssc.sparkContext.defaultParallelism),true)
WordCounts.print()
ssc.start()
ssc.awaitTermination()
}
备注:Kafka和Flume两个框架和SparkStreaming来进行实时数据流通,都是通过内部创建流
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
val flumeStream =FlumeUtils.createStream(ssc, host, port)
Spark的实时计算用SparkStreaming;离线计算用SparkContext。入参都有conf,一个含延时,另一个不含延时
补充:设置输出日志信息
只需在该main方法中执行:LoggerLevels.setStreamingLogLevels()
设置后,输出的信息,只有打印出来的数据信息
object LoggerLevelsextendsLogging{
def setStreamingLogLevels(){
val log4jInitialized =Logger.getRootLogger.getAllAppenders.hasMoreElements
if(!log4jInitialized){
logInfo("Setting log level to [WARN] for streaming example."+
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
D32 SparkStreaming相关推荐
- 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 API 注意 代码实现-自动提交偏移量到默认主题 代码实现- ...
- 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...
- 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...
- 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...
- 2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例 ...
- 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...
- 2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
目录 SparkStreaming数据抽象-DStream DStream 是什么 DStream Operations Transformation Output函数 SparkStreaming数 ...
- 第3课:SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错...
本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming容错架构和运行机制 理解SparkStreaming的Job的整个架构和运行机制对于精通Spar ...
- SparkStreaming靠什么征服了字节跳动?
随着信息技术的迅猛发展以及数据量的爆炸式增长,数据的种类与变化速度促使人们对大数据处理提出了更高的要求.但传统的批处理框架却一直难以满足各个领域中的实时性需求. Spark--实现大数据的流式处理 S ...
最新文章
- html5简单拖拽实现自动左右贴边+幸运大转盘
- NEFU 1146 又见A+B
- NOJ——1672剪绳子(博弈)
- 经典面试题:用typeof来判断对象的潜在陷阱
- 近期计算机视觉相关算法竞赛汇总—总奖池超553万人民币
- 折半查找的思想及源码_二分查找及对应的几道经典题目
- 【对讲机的那点事】解读无管局《回答》:充分理解物联网产业诉求,值得点赞!...
- android DisplayMetrics
- 软件测试 - V模型、W模型、H模型、X模型
- Android viewpager + fragment实现fragment之间的切换
- 阿里云宗志刚:云网一体,新一代洛神云网络平台
- 互联网金融一:大额支付系统、小额支付系统介绍
- ALSA 音频工具 amixer、aplay、arecord
- 播布客学习视频_C学习笔记_simple
- 输入一个数字n,输出一个n层的特定三角形
- 自然语言处理--MM、RMM算法及Python 复习
- ServU 5.0的配置
- 20170909深度学习solar测试日志
- 想起了三联书店 [戴文葆]
- Mac 系统不兼容移动硬盘无法识别怎么办