spark-streaming 编程(五)updateStateByKey
updateStateByKey(func)
从名字上来看,该函数会更新根据key聚合,并不断更新value值
要想使用该函数,Dstream之前的类型必须是K,V形式的二元组。
经过执行func函数后,会返回一个key的所有的聚合值得状态。
以word count为例,对于每一个批的数据进行分解聚合,会得到当前的这个批的状态,经过聚合后得到值的,假设有(word1,10),(word2,15),(word3,1);
而对于之前的累积值,(word1,100),(word2,10),(word3,15),(word4,10)
则下一步的状态值为(word1,100+10),(word2,10+15),(word3,1+15),(word4,0+10)
等待下一个批次的数据到了,继续按照这个逻辑进行处理。
代码示例:
package com.lgh.sparkstreamingimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils/*** Created by lgh on 2017/8/24.*/
object UpdateStateByKey {def main(args: Array[String]): Unit = {val brokers = "mtime-bigdata00:9092,mtime-bigdata01:9092";val topics = "testkafka";val batchseconds = "10";val checkpointDirectory = "./upbykey";val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createcontext(brokers, topics, batchseconds, checkpointDirectory))ssc.start()ssc.awaitTermination()}def createcontext(brokers: String, topics: String, batchseconds: String, checkpointDirectory: String): StreamingContext = {val sparkconf = new SparkConf().setAppName("TestUpStateByKey").setMaster("local[3]")val ssc = new StreamingContext(sparkconf, Seconds(batchseconds.toInt))val topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers);val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)val lines: DStream[String] = messages.map(_._2)val message: DStream[(String, Long)] = lines.flatMap(_.split(" ")).map(x => (x, 1L)).reduceByKey(_+_)val keyvalue=message.updateStateByKey[Long](addFunction _)keyvalue.print(100)ssc.checkpoint(checkpointDirectory)ssc}//curValues:对于每一个key的所有value值得集合Seq//preVauleState: 之前的每一个key的值对应的value值;这这个例子中是单词计数的之前的累计值//def addFunction(currValues: Seq[Long], preVauleState: Option[Long]): Option[Long] = {//计算得到当前key在这个Dstream中的值val currentSum:Long = currValues.sum//得到当前key以前的累积值val previousSum:Long = preVauleState.getOrElse(0)//计算出当前key对应的新的值val nowvalue:Long=currentSum + previousSum;//返回结果值;在scala中,Some和None是Option的子类Some(nowvalue)}}
spark-streaming 编程(五)updateStateByKey相关推荐
- 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)
Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...
- Spark Streaming 编程指南[中英对照]
2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...
- Spark大数据分析与实战:Spark Streaming编程初级实践
Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...
- Spark Streaming 编程新手入门指南
Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...
- Spark详解(十二):Spark Streaming原理和实现
1 简介 SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka.Flume.Twitter.Zero和TCP 套接字 ...
- Spark Streaming与流处理
Spark Streaming与流处理 一.流处理 1.1 静态数据处理 在流处理之前,数据通常存储在数据库,文件系统或其他形式的存储系统中.应用程序根据需要查询数据或计算数据.这就是传统的静态数据处 ...
- 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)
导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...
- hive编程指南电子版_第三篇|Spark SQL编程指南
在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...
- spark-sql建表语句限制_第三篇|Spark SQL编程指南
在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...
- Spark Streaming实时数据分析
1.Spark Streaming功能介绍 1)定义 Spark Streaming is an extension of the core Spark API that enables scalab ...
最新文章
- rmdir删除文件报Permission denied in 错误
- django学习(1)-----项目组成
- rabbitmq 取消消息_认识RabbitMQ从这篇文章开始
- linux怎么杀死线程c语言,教程-linux下c语言编程 第一弹-线程的使用
- Win32项目关于MessageBox参数的详细说明
- 在Chrome78浏览器上如何实现自动播放音视频
- for update在mysql中使用
- Security+ 学习笔记35 配置管理
- 在mudbuilder上的胡扯1
- halcon相机标定助手_Halcon教程之单相机标定
- 程序员:我终于知道post和get的区别
- 在云测平台对手机进行兼容性测试
- RoboCup3D仿真2019年国赛TC笔记
- waves服务器系统盘,waves服务器:Waves现场声音解决方案
- 搜索关键字拼音智能提示实现
- 基础知识(三),OSI七层协议、数据传输过程、数据的封装与解封装、IP抓包分析、交换机、路由器、ARP协议、TRUNK中继、VLAN、DHCP中继、ICMP协议、三层交换机
- 苹果8p手机的指纹解锁为什么会失灵呢?怎么解决
- 香港 - 寻找轻鬆攻略游(蒲台岛)
- 系统开发建设要经过哪些流程?
- word中项目符号自动变小_HTML和Word中的项目符号点类型和创建
热门文章
- Windows 11正式版来了!一文带你免费升级、镜像下载、最低系统要求
- Tasker Android系统增强神器,Android系统增强神器 Tasker
- 将轻松的留给生活,沉重的留给写作——读《灯下尘》(七堇年)
- 2022西藏最新消防设施操作员模拟考试试题题库及答案
- matlab绘制动图
- 通俗易懂讲解 CAP理论
- 手机双摄像头原理及产业解析
- python手游自动化测试流程_基于Python+appium的ios自动化测试demo(更新中)
- ds5100更换电池 ibm_IBMDS5100更换电池
- 龙腾世纪审判一直连接服务器,【1.7.2】【rpg】我的世界龙腾世纪群组服务器