mapWithState的用法

message.mapWithState(StateSpec.function(func).initialState(RDD).timeout(time))

需要自己写一个匿名函数func来实现自己想要的功能。如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值。
另外,还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在func中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。

代码示例:

package com.lgh.sparkstreamingimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtilsimport scala.collection.mutable.Set/*** Created by lgh on 2017/8/24.*/
object mapWithState {def main(args: Array[String]): Unit = {val brokers = "mtime-bigdata00:9092,mtime-bigdata01:9092";val topics = "testkafka";val batchseconds = "10";val checkpointDirectory = "./mapwithstate";val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createcontext(brokers, topics, batchseconds, checkpointDirectory))ssc.start()ssc.awaitTermination()}def createcontext(brokers: String, topics: String, batchseconds: String, checkpointDirectory: String): StreamingContext = {val sparkconf = new SparkConf().setAppName("TestUpStateByKey").setMaster("local[3]")val ssc = new StreamingContext(sparkconf, Seconds(batchseconds.toInt))val topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)val lines: DStream[String] = messages.map(_._2)val message: DStream[(String, Long)] = lines.flatMap(_.split(" ")).map(x=>(x,1l)).reduceByKey(_+_)//匿名函数val logical = (key: String, value: Option[Long], state: State[Long])=>{//这个的作用是检测已经过期的key并移除;如果有key过期后又有这个key新的数据进来,不加isTimeout的话就会导致报错if(state.isTimingOut()){System.out.println(key+" is timingout")}else {val sum = state.getOption().getOrElse(0l) + value.getOrElse(0l)val output = (key, sum)//更新状态state.update(sum)output}}val keyvalue=message.mapWithState(StateSpec.function(logical).timeout(Seconds(60)))keyvalue.stateSnapshots().foreachRDD((rdd,time)=>{println("========"+rdd.count())rdd.foreach( x=>println(x._1+"="+x._2))})ssc.checkpoint(checkpointDirectory)ssc}}

spark-streaming 编程(六)mapwithState相关推荐

  1. 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)

    Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...

  2. Spark Streaming 编程指南[中英对照]

    2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...

  3. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

  4. Spark Streaming 编程新手入门指南

    Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...

  5. sparkstreaming监听hdfs目录如何终止_四十六、Spark Streaming简介及入门

    1.什么是Spark Streaming Spark Streaming是基于Spark Core之间的实时计算框架,可以从很多数据源消费数据并对数据进行处理.它是Spark核心API的一个扩展与封装 ...

  6. 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)

    导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...

  7. hive编程指南电子版_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  8. Spark详解(十二):Spark Streaming原理和实现

    1 简介 SparkStreaming是Spark核心API的一个扩展,具有高吞吐量和容错能力的实时流数据处理系统,可以对多种数据源(如Kdfka.Flume.Twitter.Zero和TCP 套接字 ...

  9. spark-sql建表语句限制_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  10. Spark Streaming实时数据分析

    1.Spark Streaming功能介绍 1)定义 Spark Streaming is an extension of the core Spark API that enables scalab ...

最新文章

  1. 【CentOS 7MySQL常用操作3】,MySQL常用命令#180113
  2. java初始化例子_Java 非静态初始化的例子
  3. mybatis 动态传入表名 注解_Mybatis动态sql的动态表名问题
  4. QML基础——在C++程序中使用QML
  5. Effective_STL 学习笔记(二十七) 用 distance 和 advance 把 const_iterator 转化成 iterator...
  6. 手机 html 折叠效果,HTML5仿苹果手机的面板合拢折叠效果
  7. 【今日CS 视觉论文速览】19 Dec 2018
  8. 北林oj-算法设计与分析-Line up in the canteen(两种解法,附思路)
  9. logback error 分开存日志
  10. 弯管机编程软件电脑版_乐高Wedo2.0电脑版下载
  11. 关于U盘被写保护无法格式化的解决方法
  12. Ubuntu16.04/linux系统旋转显示器屏幕即竖屏显示(亲测)
  13. ale_python_interface安装操作
  14. int不是默认为0吗?为什么会提示要初始化?
  15. JSOI2014骑士游戏(最短路)
  16. 使用Sharding-Proxy完成mysql分库分表和主从复制
  17. Pygame 官方文档 - 中译版
  18. bootstrap datetimepicker 用法+demo案例下载
  19. 【HTML CSS JS 实现QQ2009界面——附源代码】
  20. 滑块验证码 python

热门文章

  1. 聚沙成塔--爬虫系列一(环境,开发工具搭建)
  2. phyton方面相关书籍
  3. 学phyton第一天
  4. oracle 三表连接 join,三个表innerjoin 如何用inner join关联三张表
  5. java 中文星期表示_java之获得中文星期几
  6. c#中控制不能从一个case标签贯穿到另一个case标签是啥意思
  7. 手写jQuery轮播图插件,即拿即用,更多接口,更少代码实现你想要的轮播图~~
  8. LeetCode 2300. 咒语和药水的成功对数
  9. 微分中的dx和delta x
  10. JavaScript使用手册(一)