概述每一个Spark Streaming应用,正常来说,都是要7 * 24小时运转的,这就是实时计算程序的特点。因为要持续不断的对数据进行计算。因此,对实时计算应用的要求,应该是必须要能够对与应用程序逻辑无关的失败,进行容错。

如果要实现这个目标,Spark Streaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复。有两种数据需要被进行checkpoint:元数据checkpoint——将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS。当运行Spark Streaming应用程序的Driver进程所在节点失败时,该信息可以用于进行恢复。元数据信息包括了:

1.1 配置信息——创建Spark Streaming应用程序的配置信息,比如SparkConf中的信息。

1.2 DStream的操作信息——定义了Spark Stream应用程序的计算逻辑的DStream操作信息。

1.3 未处理的batch信息——那些job正在排队,还没处理的batch信息。

数据checkpoint——将实时计算过程中产生的RDD的数据保存到可靠的存储系统中。

对于一些将多个batch的数据进行聚合的,有状态的transformation操作,这是非常有用的。在这种transformation操作中,生成的RDD是依赖于之前的batch的RDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长。

要避免由于依赖链条越来越长,导致的一起变得越来越长的失败恢复时间,有状态的transformation操作执行过程中间产生的RDD,会定期地被checkpoint到可靠的存储系统上,比如HDFS。从而削减RDD的依赖链条,进而缩短失败恢复时,RDD的恢复时间。

一句话概括,元数据checkpoint主要是为了从driver失败中进行恢复;而RDD checkpoint主要是为了,使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复。

何时启用Checkpoint机制?使用了有状态的transformation操作——比如updateStateByKey,或者reduceByKeyAndWindow操作,被使用了,那么checkpoint目录要求是必须提供的,也就是必须开启checkpoint机制,从而进行周期性的RDD checkpoint。

要保证可以从Driver失败中进行恢复——元数据checkpoint需要启用,来进行这种情况的恢复。要注意的是,并不是说,所有的Spark Streaming应用程序,都要启用checkpoint机制,如果即不强制要求从Driver失败中自动进行恢复,又没使用有状态的transformation操作,那么就不需要启用checkpoint。事实上,这么做反而是有助于提升性能的。

对于有状态的transformation操作,启用checkpoint机制,定期将其生产的RDD数据checkpoint,是比较简单的。

可以通过配置一个容错的、可靠的文件系统(比如HDFS)的目录,来启用checkpoint机制,checkpoint数据就会写入该目录。使用StreamingContext的checkpoint()方法即可。然后,你就可以放心使用有状态的transformation操作了。

如果为了要从Driver失败中进行恢复,那么启用checkpoint机制,是比较复杂的。需要改写Spark Streaming应用程序。

当应用程序第一次启动的时候,需要创建一个新的StreamingContext,并且调用其start()方法,进行启动。当Driver从失败中恢复过来时,需要从checkpoint目录中记录的元数据中,恢复出来一个StreamingContext。JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {

@Override

public JavaStreamingContext create() {

JavaStreamingContext jssc = new JavaStreamingContext(...);

JavaDStream lines = jssc.socketTextStream(...);

jssc.checkpoint(checkpointDirectory);

return jssc;

}

};

JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

context.start();

context.awaitTermination();def functionToCreateContext(): StreamingContext = {

val ssc = new StreamingContext(...)

val lines = ssc.socketTextStream(...)

ssc.checkpoint(checkpointDirectory)

ssc

}

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

context.start()

context.awaitTermination()

配置spark-submit提交参数按照上述方法,进行Spark Streaming应用程序的重写后,当第一次运行程序时,如果发现checkpoint目录不存在,那么就使用定义的函数来第一次创建一个StreamingContext,并将其元数据写入checkpoint目录;当从Driver失败中恢复过来时,发现checkpoint目录已经存在了,那么会使用该目录中的元数据创建一个StreamingContext。

但是上面的重写应用程序的过程,只是实现Driver失败自动恢复的第一步。第二步是,必须确保Driver可以在失败时,自动被重启。

要能够自动从Driver失败中恢复过来,运行Spark Streaming应用程序的集群,就必须监控Driver运行的过程,并且在它失败时将它重启。对于Spark自身的standalone模式,需要进行一些配置去supervise driver,在它失败时将其重启。

首先,要在spark-submit中,添加--deploy-mode参数,默认其值为client,即在提交应用的机器上启动Driver;但是,要能够自动重启Driver,就必须将其值设置为cluster;此外,需要添加--supervise参数。

使用上述第二步骤提交应用之后,就可以让driver在失败时自动被重启,并且通过checkpoint目录的元数据恢复StreamingContext。

checkpoint的说明将RDD checkpoint到可靠的存储系统上,会耗费很多性能。当RDD被checkpoint时,会导致这些batch的处理时间增加。因此,checkpoint的间隔,需要谨慎的设置。对于那些间隔很多的batch,比如1秒,如果还要执行checkpoint操作,则会大幅度削减吞吐量。而另外一方面,如果checkpoint操作执行的太不频繁,那就会导致RDD的lineage变长,又会有失败恢复时间过长的风险。

对于那些要求checkpoint的有状态的transformation操作,默认的checkpoint间隔通常是batch间隔的数倍,至少是10秒。使用DStream的checkpoint()方法,可以设置这个DStream的checkpoint的间隔时长。通常来说,将checkpoint间隔设置为窗口操作的滑动间隔的5~10倍,是个不错的选择。

java checkpoint_Checkpoint机制相关推荐

  1. java事件处理模型_从零开始理解JAVA事件处理机制(3)

    我们连续写了两小节的教师-学生的例子,必然觉得无聊死了,这样的例子我们就是玩上100遍,还是不知道该怎么写真实的代码.那从本节开始,我们开始往真实代码上面去靠拢. 事件最容易理解的例子是鼠标事件:我们 ...

  2. 两道面试题,带你解析Java类加载机制

    2019独角兽企业重金招聘Python工程师标准>>> 在许多Java面试中,我们经常会看到关于Java类加载机制的考察,例如下面这道题: class Grandpa {static ...

  3. Java类加载机制详解【java面试题】

    Java类加载机制详解[java面试题] (1)问题分析: Class文件由类装载器装载后,在JVM中将形成一份描述Class结构的元信息对象,通过该元信息对象可以获知Class的结构信息:如构造函数 ...

  4. 利用java反射机制 读取配置文件 实现动态类载入以及动态类型转换

    作者:54dabang 在spring的学习过程之中,我们能够看出通过配置文件来动态管理bean对象的优点(松耦合 能够让零散部分组成一个总体,而这些总体并不在意之间彼此的细节,从而达到了真正的物理上 ...

  5. Struts2中action接收参数的三种方法及ModelDriven跟Preparable接口结合JAVA反射机制的灵活用法...

    Struts2中action接收参数的三种方法及ModelDriven跟Preparable接口结合JAVA反射机制的灵活用法 www.MyException.Cn   发布于:2012-09-15 ...

  6. 谈谈 Java 类加载机制

    点击上方"方志朋",选择"置顶或者星标" 你的关注意义重大! 来源:Rainstorm , github.com/c-rainstorm/blog/blob/m ...

  7. Java反射机制分析指南

    一.JAVA是动态语言吗? 一般而言,说到动态言,都是指在程序运行时允许改变程序结构或者变量类型,从这个观点看,JAVA和C++一样,都不是动态语言. 但JAVA它却有着一个非常突出的动态相关机制:反 ...

  8. 反射 字段_详解面试中常考的 Java 反射机制

    反射(Reflection) 是 Java 程序开发语言的特征之一,它允许运行中的 Java 程序对自身进行检查,或者说"自审",并能直接操作程序的内部属性和方法. 反射是一项高级 ...

  9. 【java】java反射机制,动态获取对象的属性和对应的参数值,并属性按照字典序排序,Field.setAccessible()方法的说明【可用于微信支付 签名生成】...

    方法1:通过get()方法获取属性值 package com.sxd.test.controller;public class FirstCa{private Integer num;private ...

最新文章

  1. 疫情之下,“无接触”生意火了
  2. Centos7 搭建 hadoop3.1.1 集群教程
  3. mybatis配置log4j控制台打印SQL语句
  4. 使用RabbitMQ做的一些工作及经验教训
  5. 铁路联网售票系统 按计算机应用,铁路联网售票系统按计算机应用的分类它属于...
  6. 5年没发论文,读博想放弃?中科大博导万字自述:曾连收13封拒稿信...
  7. 目前使用SAP的公司列表
  8. freessl 免费https证书申请
  9. 程序员的思考:一年管理成富翁,三年市场路路通,十年技术一场空
  10. python contains 正则_Python 正则表达式
  11. 360隐私保护器重现 称不针对任何公司
  12. html九宫格排列图片,10款jQuery实现的360浏览器九宫格图片拖拽排序
  13. 华为路由器Serial接口及串口无法实现ACL访问控制解析
  14. 重命名技巧,支持多个文件夹快速重命名
  15. Nonebot QQ机器人插件五:随机网易云音乐
  16. 若依前后端分离框架——初始化参数功能源码学习
  17. 算法09——patA1033 加油站问题(贪心)
  18. 游戏输入控制的五条黄金法则
  19. 递归:汉罗塔问题的程序实现
  20. 计算机审计 报告哦,计算机审计实训报告.doc

热门文章

  1. ubuntu下安装invidia显卡驱动
  2. 第三十八讲项目二 打豆豆
  3. scala入门_Scala和Scalatra入门–第一部分
  4. 0xc0000005 系统应用日志_求大佬来个崩溃日志的分析教程
  5. 红帽系统配置nginx自启动
  6. nginx负载均衡测试实例
  7. java jdbc updatedelete的实现
  8. mysql 创建索引、删除索引、查看索引sql语句
  9. Linux运维实战:Centos逻辑卷磁盘挂载流程
  10. 【数学建模】清风视频笔记4、拟合算法