文章目录

  • Flink的Exactly Once
  • 从Flink和Kafka组合来理解Exactly_Once
  • Two-Phase Commit(两阶段提交协议)
  • 两阶段提交Flink中应用

Flink的Exactly Once

端对端精确一次处理的意义是:每条消息只会影响最终结果一次即只会影响应用状态异一次,而非被处理一次,即使出现机器故障或者软件崩溃,Flink也要保证不会有数据被重复处理或者没被处理的情况来影响到状态。
Flink 1.4 版本之前,精准一次处理只限于 Flink 应用内,也就是所有的 Operator 完全由 Flink 状态保存并管理的才能实现精确一次处理。但 Flink 处理完数据后大多需要将结果发送到外部系统,比如 Sink 到 Kafka 中,这个 过程中 Flink 并不保证精准一次处理。
在 Flink 1.4 版本正式引入了一个里程碑式的功能:两阶段提交 Sink,即 TwoPhaseCommitSinkFunction 函数。该 SinkFunction 提取并封装了两阶段提交协议中的公共逻辑,kafka 0.11之后,Producer的send操作现在是幂等的,在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次,

实现精确一次处理语义(英文简称:EOS,即 Exactly-Once Semantics)。 在 Flink 中需要端到端精准一次处理的位置有三个:
Source端:数据从上一阶段进入到 Flink 时,需要保证消息精准一次消 费。

Flink 内部端:Flink 分布式快照保存数据计算的状态和消费的偏移量,保证程序重启之后不丢失状态和消费偏移量(也就是利用 Checkpoint 机制,把状态存盘, 发生故障的时候可以恢复,保证内部的状态一致性)

Sink端:将处理完的数据发送到下一阶段时,需要保证数据能够准确无 误发送到下一阶段

从Flink和Kafka组合来理解Exactly_Once


如上图所示,Flink 中包含以下组件:

  1. 一个 Source,从 Kafka 中读取数据(即 KafkaConsumer)
  2. 一个时间窗口化的聚会操作(Window)
  3. 一个 Sink,将结果写入到 Kafka(即 KafkaProducer)

若要 Sink 支持精准一次处理语义(EOS),它必须以事务的方式写数据到 Kafka, 这样当提交事务时两次 Checkpoint 间的所有写入操作当作为一个事务被提交。 这确保了出现故障或崩溃时这些写入操作能够被回滚。 当然了,在一个分布式且含有多个并发执行 Sink 的应用中,仅仅执行单次提交或 回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得 到一个一致性的结果。Flink 使用两阶段提交协议以及预提交(Pre-commit)阶段来 解决这个问题。

Two-Phase Commit(两阶段提交协议)

两阶段提交协议(Two-Phase Commit,2PC)是很常用的解决分布式事务问题的方式,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即 实现 ACID 中的 A (原子性)。 在数据一致性的环境下,其代表的含义是:要么所有备份数据同时更改某个数值, 要么都不改,以此来达到数据的强一致性。 两阶段提交协议中有两个重要角色,协调者(Coordinator)和参与者(Participant), 其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。

两阶段提交将提交过程划分为连续的两个阶段:

  • 表决阶段(Voting)
  • 提交阶段(Commit)

第一阶段:表决阶段

  • 1.协调者向所有参与者发送一个 VOTE_REQUEST 消息。
  • 2.当参与者接收到 VOTE_REQUEST 消息,向协调者发送 VOTE_COMMIT 消息 作为回应,告诉协调者自己已经做好准备提交准备,如果参与者没有准备 好或遇到其他故障,就返回一个 VOTE_ABORT消息,告诉协调者目前无法 提交事务。

第二阶段:提交阶段

  • 1.协调者收集来自各个参与者的表决消息。如果所有参与者一致认>为可以提交事务,那么协调者决定事务的最终提交,在此情形下协调者向所有参与 者发送一个GLOBAL_COMMIT消息,通知参与者进行本地提交;如果所有 参与者中有任意一个返回消息是 VOTE_ABORT,协调者就会取消事务,向所 有参与者广播一条
    GLOBAL_ABORT 消息通知所有的参与者取消事务。
  • 2.每个提交了表决信息的参与者等候协调者返回消息,如果参与者接收到一 个
    GLOBAL_COMMIT 消息,那么参与者提交本地事务,否则如果接收到 GLOBAL_ABORT 消息,则参与者取消本地事务

两阶段提交Flink中应用

Flink的两阶段提交:从 Flink 程序启动到消费 Kafka 数据,最后到 Flink 将数据 Sink 到 Kafka 为止,来分析 Flink 的精准一次处理

1.当 Checkpoint 启动时,JobManager 会将检查点分界线(checkpoint battier)注入数据流,checkpoint barrier 会在算子间传递下去,如下 如所示

2.Source 端:Flink Kafka Source 负责保存 Kafka 消费 offset,当 Chckpoint 成功时 Flink 负责提交这些写入,否则就终止取消掉它们, 当 Chckpoint 完成位移保存,它会将 checkpoint barrier(检查点分界 线) 传给下一个 Operator,然后每个算子会对当前的状态做个快照,保 存到状态后端(State Backend)
对于 Source 任务而言,就会把当前的 offset 作为状态保存起来。下次从 Checkpoint 恢复时,Source 任务可以重新提交偏移量,从上次保存的位置 开始重新消费数据,如下图所示:

3.Slink 端:从 Source 端开始,每个内部的 transform 任务遇到 checkpoint barrier(检查点分界线)时,都会把状态存到 Checkpoint 里。 数据处理完毕到 Sink 端时,Sink
任务首先把数据写入外部 Kafka,这 些数据都属于预提交的事务(还不能被消费),此时的 Pre-commit 预提 交阶段下 Data Sink 在保存状态到状态后端的同时还必须预提交它的外部 事务,如下图所示:

4.当所有算子任务的快照完成(所有创建的快照都被视为是 Checkpoint 的 一部分),也就是这次的 Checkpoint 完成时,JobManager 会向所有任务 发通知,确认这次 Checkpoint 完成,此时 Pre-commit 预提交阶段才算完 成。才正式到两阶段提交协议的第二个阶段:commit 阶段。该阶段中 JobManager 会为应用中每个 Operator 发起 Checkpoint 已完成的回调 逻辑。
本例中的 Data Source 和窗口操作无外部状态,因此在该阶段,这两个 Opeartor 无需执行任何逻辑,但是 Data Sink 是有外部状态的,此时我 们必须提交外部事务,当 Sink 任务收到确认通知,就会正式提交之前的 事务,Kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费 了,如下图所示:


注:Flink 由 JobManager 协调各个 TaskManager 进行 Checkpoint 存储, Checkpoint 保存在 StateBackend(状态后端) 中,默认 StateBackend 是内存级 的,也可以改为文件级的进行持久化保存
总结:
1.Flink消费到Kafka数据之后,就会开启一个Kafka的事务,正常写入Kafka分区日志标记但未提交,也就是预提交(Per-commit)
2.一旦所有的Operator完成各自的Per-commit,他们会发起一个commit操作
3.如果有任意一个Per-commit失败,所有其他的Per-commit必须停止,并且Flink会回滚到最近成功完成的CheckPoint
4.当所有的Operator完成任务时,Sink端就收到checkpoint barrier(检查点分界线),Sink保存当前状态,存入Checkpoint,通知JobManager,并提交外部事物,用于提交外部检查点的数据
5.JobManager收到所有任务通知,发出确认信息,表示Checkpoint已经完成,Sink收到JobManager的确认信息,正式提交这段时间的数据
6.外部系统(Kafka)关闭事务,提交的数据可以正常消费了

一旦Per-commit完成,必须要确保commit也要成功,Operator和外部系统都需要对此进行保证。如果commit失败(例:网络崩溃导致等),Flink应用就会崩溃,然后根据用户重启策略进行重启,之后在重试commit。这个过程非常重要,因为如果commit无法顺利执行,就有可能出现数据丢失情况,因此,所有的Operator必须对Checkpoint最终结果达成共识:要么所有的Operator都认定数据提交执行成功,要么所有Operator认定数据提交失败,然后终止然后回滚

  • FlinkSinktoKafka
package com.liu.coreimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}object Demo6SInkKafka {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 每 1000ms 开始一次 checkpointenv.enableCheckpointing(20000)// 高级选项:// 设置模式为精确一次 (这是默认值)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 确认 checkpoints 之间的时间会进行 500 msenv.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)// Checkpoint 必须在一分钟内完成,否则就会被抛弃env.getCheckpointConfig.setCheckpointTimeout(60000)// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)val config: CheckpointConfig = env.getCheckpointConfig//任务失败后自动保留最新的checkpoint文件config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置状态后端,保存状态的位置val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)env.setStateBackend(stateBackend)val properties = new Properties()properties.setProperty("bootstrap.servers", "master:9092")properties.setProperty("group.id", "asdasdas")//创建flink kafka 消费者val flinkKafkaConsumer = new FlinkKafkaConsumer[String]("exactly_once", new SimpleStringSchema(), properties)//如果有checkpoint,不是读取最新的数据,而是从checkpoint的位置读取数据flinkKafkaConsumer.setStartFromLatest()val kafkaDS: DataStream[String] = env.addSource(flinkKafkaConsumer)val wordsDS: DataStream[String] = kafkaDS.flatMap(_.split(","))//将数据写回到kafka中//会导致数据重复/* val myProducer = new FlinkKafkaProducer[String]("master:9092", // broker 列表"sink_kafka", // 目标 topicnew SimpleStringSchema) // 序列化 schema*/val properties1 = new Propertiesproperties1.setProperty("bootstrap.servers", "master:9092")//事务的超时时间properties1.setProperty("transaction.timeout.ms", 5 * 60 * 1000 + "")//创建生产者val myProducer = new FlinkKafkaProducer[String]("sink_kafka", // 目标 topicnew SimpleStringSchema,properties1,null, //分区方法Semantic.EXACTLY_ONCE, // 唯一一次5) // 序列化 schemawordsDS.addSink(myProducer)env.execute()}
}

Flink的端对端精准一次处理(Exactly-Once)相关推荐

  1. 吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

    长文预警, 全文两万五千多字, 37页word文档的长度 (略有杂乱,有些非常复杂的地方可能需要更多的例子来说明,使得初学者也能很容易看懂,但是实在花的时间已经太多太多了,留待后边利用起碎片时间一点点 ...

  2. StarRocks X Flink CDC,打造端到端实时链路

    实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大.以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务.在部分领域,数 ...

  3. 端到端一致性,流系统Spark/Flink/Kafka/DataFlow对比总结(压箱宝具呕血之作)

    前 这篇文章可以说是作者压箱底儿的知识总结(之一,毕竟作者学的东西很杂 ╮( ̄▽ ̄"")╭ )了. 断断续续写了将近三个月, 耗费了大量的精力, 本来的目的本来只是想对比一下各个s ...

  4. Flink 状态一致性:端到端状态一致性的保证

    文章目录 状态一致性 什么是状态一致性 状态一致性种类 端到端(end-to-end)状态一致性 Sink端到端状态一致性的保证 Flink+Kafka端到端状态一致性的保证 状态一致性 什么是状态一 ...

  5. Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

    文章目录: Apache Flink 应用程序中的 Exactly-Once 语义 Flink 应用程序端到端的 Exactly-Once 语义 示例 Flink 应用程序启动预提交阶段 在 Flin ...

  6. kafka window 启动_Apache Flink结合Kafka构建端到端的Exactly-Once处理

    Apache Flink自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira).它提取了两阶段提交协议的 ...

  7. flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现

    2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...

  8. flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理

    周凯波(宝牛) 阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜索事业部,从事搜索离线平台的研发工作,参与将搜索后台数据处理架构从MapReduce到Flink的重构.目前在阿里计算平台事业 ...

  9. Flink(53):Flink高级特性之端到端精确一次消费(End-to-End Exactly-Once)

    目录 0. 相关文章链接 1. 流处理的数据处理语议 1.1. At-most-once-最多一次 1.2. At-least-once-至少一次 1.3. Exactly-once-精确一次 1.4 ...

最新文章

  1. FPGA(4)晶振与计数器 -- 实现定时器(led定时闪烁、蜂鸣器频率控制(单响)、蜂鸣器报警(频带控制,多响))
  2. 你知道什么是Jest吗?
  3. python3 xrange *的作用
  4. tomcat部署与Context
  5. BUUCTF的Web真题学习整理(一)
  6. 第一次作业:深入Linux源码分析进程模型
  7. [html] HTML与XHTML二者有不同
  8. oracle 朱志辉_DB2设计、管理与性能优化艺术
  9. AI 算法起家的今日头条为何败给了色情?
  10. 数学归纳法在数据结构与算法分析设计中的应用
  11. 数据结构—线性表及其实现方式
  12. [tensorflow]tensorflw2.1.0张量和变量介绍
  13. [精简]托福核心词汇20
  14. [USACO20JAN]Loan Repayment S
  15. idea程序参数program arguments,vm arguments,Environment variable
  16. CompletableFuture学习
  17. 202109-4 收集卡牌
  18. 粒子群与蚁群算法的区别
  19. (附源码)计算机毕业设计SSM职业学校招生系统
  20. 全媒体时代下高校思想政治教育目标应坚持的原则

热门文章

  1. CSS实现文字和图片的水平垂直居中
  2. Vue环境下用ECharts绘制中国地图,并实现拖动、缩放与各省份自动轮播高亮显示
  3. 怎样检测和维护LED显示屏系统
  4. 删除数据后无法恢复的固态盘
  5. 制作横版游戏KillBear第9课:暂停层+屏蔽下层监听
  6. 解惑:如何使得寝室的电脑和实验室的电脑远程相互访问(Linux和Windows)
  7. wordcloud绘制词云彩
  8. 蓝桥杯 成绩分析 python组
  9. 冠珠瓷砖打造民族文化品牌,让中国陶成为中国潮
  10. Hacking Diablo II之外挂的工作原理