updateStateByKey(func)

从名字上来看,该函数会更新根据key聚合,并不断更新value值

要想使用该函数,Dstream之前的类型必须是K,V形式的二元组。
经过执行func函数后,会返回一个key的所有的聚合值得状态。

以word count为例,对于每一个批的数据进行分解聚合,会得到当前的这个批的状态,经过聚合后得到值的,假设有(word1,10),(word2,15),(word3,1);
而对于之前的累积值,(word1,100),(word2,10),(word3,15),(word4,10)
则下一步的状态值为(word1,100+10),(word2,10+15),(word3,1+15),(word4,0+10)
等待下一个批次的数据到了,继续按照这个逻辑进行处理。

代码示例:

package com.lgh.sparkstreamingimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils/*** Created by lgh on 2017/8/24.*/
object UpdateStateByKey {def main(args: Array[String]): Unit = {val brokers = "mtime-bigdata00:9092,mtime-bigdata01:9092";val topics = "testkafka";val batchseconds = "10";val checkpointDirectory = "./upbykey";val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createcontext(brokers, topics, batchseconds, checkpointDirectory))ssc.start()ssc.awaitTermination()}def createcontext(brokers: String, topics: String, batchseconds: String, checkpointDirectory: String): StreamingContext = {val sparkconf = new SparkConf().setAppName("TestUpStateByKey").setMaster("local[3]")val ssc = new StreamingContext(sparkconf, Seconds(batchseconds.toInt))val topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers);val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)val lines: DStream[String] = messages.map(_._2)val message: DStream[(String, Long)] = lines.flatMap(_.split(" ")).map(x => (x, 1L)).reduceByKey(_+_)val keyvalue=message.updateStateByKey[Long](addFunction _)keyvalue.print(100)ssc.checkpoint(checkpointDirectory)ssc}//curValues:对于每一个key的所有value值得集合Seq//preVauleState: 之前的每一个key的值对应的value值;这这个例子中是单词计数的之前的累计值//def addFunction(currValues: Seq[Long], preVauleState: Option[Long]): Option[Long] = {//计算得到当前key在这个Dstream中的值val currentSum:Long = currValues.sum//得到当前key以前的累积值val previousSum:Long = preVauleState.getOrElse(0)//计算出当前key对应的新的值val nowvalue:Long=currentSum + previousSum;//返回结果值;在scala中,Some和None是Option的子类Some(nowvalue)}}

spark-streaming 编程(五)updateStateByKey相关推荐

  1. 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)

    Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...

  2. Spark Streaming 编程指南[中英对照]

    2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...

  3. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

  4. Spark Streaming 编程新手入门指南

    Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...

  5. Spark详解(十二):Spark Streaming原理和实现

    1 简介 SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka.Flume.Twitter.Zero和TCP 套接字 ...

  6. Spark Streaming与流处理

    Spark Streaming与流处理 一.流处理 1.1 静态数据处理 在流处理之前,数据通常存储在数据库,文件系统或其他形式的存储系统中.应用程序根据需要查询数据或计算数据.这就是传统的静态数据处 ...

  7. 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)

    导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...

  8. hive编程指南电子版_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  9. spark-sql建表语句限制_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  10. Spark Streaming实时数据分析

    1.Spark Streaming功能介绍 1)定义 Spark Streaming is an extension of the core Spark API that enables scalab ...

最新文章

  1. rmdir删除文件报Permission denied in 错误
  2. django学习(1)-----项目组成
  3. rabbitmq 取消消息_认识RabbitMQ从这篇文章开始
  4. linux怎么杀死线程c语言,教程-linux下c语言编程 第一弹-线程的使用
  5. Win32项目关于MessageBox参数的详细说明
  6. 在Chrome78浏览器上如何实现自动播放音视频
  7. for update在mysql中使用
  8. Security+ 学习笔记35 配置管理
  9. 在mudbuilder上的胡扯1
  10. halcon相机标定助手_Halcon教程之单相机标定
  11. 程序员:我终于知道post和get的区别
  12. 在云测平台对手机进行兼容性测试
  13. RoboCup3D仿真2019年国赛TC笔记
  14. waves服务器系统盘,waves服务器:Waves现场声音解决方案
  15. 搜索关键字拼音智能提示实现
  16. 基础知识(三),OSI七层协议、数据传输过程、数据的封装与解封装、IP抓包分析、交换机、路由器、ARP协议、TRUNK中继、VLAN、DHCP中继、ICMP协议、三层交换机
  17. 苹果8p手机的指纹解锁为什么会失灵呢?怎么解决
  18. 香港 - 寻找轻鬆攻略游(蒲台岛)
  19. 系统开发建设要经过哪些流程?
  20. word中项目符号自动变小_HTML和Word中的项目符号点类型和创建

热门文章

  1. Windows 11正式版来了!一文带你免费升级、镜像下载、最低系统要求
  2. Tasker Android系统增强神器,Android系统增强神器 Tasker
  3. 将轻松的留给生活,沉重的留给写作——读《灯下尘》(七堇年)
  4. 2022西藏最新消防设施操作员模拟考试试题题库及答案
  5. matlab绘制动图
  6. 通俗易懂讲解 CAP理论
  7. 手机双摄像头原理及产业解析
  8. python手游自动化测试流程_基于Python+appium的ios自动化测试demo(更新中)
  9. ds5100更换电池 ibm_IBMDS5100更换电池
  10. 龙腾世纪审判一直连接服务器,【1.7.2】【rpg】我的世界龙腾世纪群组服务器