kafka幂等性实现

1.原理阶段

在 0.11.0.0 之前的版本中, 如果 producer 没有收到表明消息已经被提交的响应, 那么 producer 除了将消息重传之外别无选择,这里提供的是at-least-once 的消息交付语义,因为如果最初的请求事实上执行成功了,那么重传过程中该消息就会被再次写入到 log 当中,从 0.11.0.0 版本开始,Kafka producer新增了幂等性的传递选项,该选项保证重传不会在 log 中产生重复条目

kafka官网描述的内容,为了实现这个目的:broker给每个producer都分配一个ID,并且producer给每条被发送的消息分配一个seq num(顺序号)来避免产生重复的消息.

同样是从0.11.0.0版本开始,producer新增了使用事务性的语义将消息发送到多个partition的功能:也就是说,要么所有的消息都被成功的写入到了log,要么一个都没写进去,这种语义的主要应用场景就是 Kafka topic 之间的 exactly-once 的数据传递

1.1 producer幂等性

producer的幂等性指的是当发送同一条消息时,数据在Service端只能被持久化一次,数据不重不丢,但是幂等性也是有条件的:

  1. 只能保证producer在单个会话内不丢不重,如果Producer出现意外挂掉再重启是无法保证的(幂等性情况下,无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重)
  2. 幂等性不能垮多个Partition ,只能保证单个partition内的幂等性,当实际多个partition时,这中间的状态并没有同步

注:如果需要垮会话、垮多个partition的情况,需要使用Kafka的事物来实现。

1.2 幂等性要解决的问题

幂等性是来解决什么问题的?

一般消息中间件提供的消息投递语义为:

At most once——消息可能会丢失但绝不重传。
At least once——消息可以重传但绝不丢失。
Exactly once——这正是人们想要的, 每一条消息只被传递一次.

在0.11.0.0之前,kafka支持的投递语义是At most once、At least once,**由于0.11.0.0增加了事物特性/幂等性,这使得Exactly once的实现成为了可能。**也就是说在0.11.0.0之前kafka通过配置producer端和consumer端配置可以保证数据不丢,也就是At least once,但是可能会导致数据重复(大部分都是由于发送失败,重试机制导致的数据重复)。

对于大多数业务场景,消息只要保证不丢,就基本上可以满足需求,但是对于业务要求较高的场景(比如支付数据等),它们是要求数据必须要准确的 ,这时候如果上游有重复数据,下游只能在消费数据时进行相应的去重,这样效率上会大打折扣。

所以针对某些场景下,由于上游数据重复的问题,导致所有精准计数需求的下游应用都需要做去重处理,如果可以在发送端就可以保证数据不重复,下游系统就能保证Exactly once,这对下游系统也是极大的解脱。
幂等性要解决的问题,主要是针对于数据重复的问题,所以这一章就让我们看下kafka的producer如何保证数据的Exactly once的

1.3 幂等性实现的原理

kakfa的投递语义0.11.0.0之前支持At most once、At least once这两种语义,幂等性要解决At least once的时候,由于异常等原因触发重试机制导致数据重复发送,幂等性的目的就是为了解决这个数据重复的问题。

那么我们是不是可以考虑针对于At least once+幂等性(唯一ID)处理 = Exactly once

要做到幂等就要解决以下问题

  1. 系统需要有能力鉴别一条数据是不是被处理过,通常处理办法是唯一ID,如果处理了该ID对ID进行缓存,这样是不是可以有效的判断数据是不是重复
  2. 唯一ID应该选择什么粒度的?是全集群公用?还是针对于某个producer ID,设置公用,还是针对于某个partition设置公用?如果kafka的解决方案是在partition的力度上做,重复数据的判断让partition的leader去判断处理,提前是producer请求要把唯一ID告知leader。partition 粒度实现唯一ID会不会导致其他问题呢?如果一个partition有来自多个producer 写入的情况,这些client之间很难做到使用同一个唯一ID,而采用producer ID+partition粒度,这样实现呢?是不是每一个priducer 都是独立的,只是在Service端对不同的client做好相应的分区即可

以上的分析,就不难理解producer幂等性的实现原理,Kafka producer在实现时有两个重要的机制:

第一点:PID(producer ID),在初始化事务时会向service发送InitProducerIdReques请求,获取pid,用于标示每个producer
第二点: Sequence numbers,client发送的每一个batch都会带有sequence number,Service端就死活根据这个值判断数据是否重复。

2.代码实现

2.1 申请PID

每个producer在初始化时都会被分配一个唯一的PID,对于一个新生成的pid,Sequence number将从0开始计数,每个Partition都会有独立的Sequence number,producer在发送数据时候,将会给每个batch(ProducerBatch)都分配一个seq num,Service就是通过这个来验证数据是否重复,这里的PID是全局唯一的,producer故障后重新启动会被分配一个新的PID,所以幂等性无法做到跨会话的原因。

2.1.1 客户端代码分析

  1. client 初始化KafkaProducer对象时初始化transactionManager,判断是否开启幂等或者事务机制,如果开启初始化transactionManager。

  2. 在client端Sender.runOnce中判断是否需要初始化producerID
    3.
    3.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded()内部代码

    这个时候,初始化producer ID的请求就会发送给broker端。

2.1.2 服务端代码分析

1.服务端收到初始化producerID请求

如果nextProducerId > currentProducerIdBlock.blockEndId则需要再向zk申请1000个pid,否则取nextProducerId

2.1.3 存储zk的PID信息

PID申请是向zk申请,zk中有/latest_producer_id_block节点,每个broker向zk申请一个PID端后,就会把自己申请的PID信息写入latest_producer_id_block节点,这样其他的broker再申请PID段时,会首先读这个节点信息,然后根据nextProducerId于block_end比较决定是否再次申请

格式:
{"version":1,"broker":0,"block_start":"0","block_end":"999"}

2.1.4 PID段申请步骤总结

  1. 如果nextProducerId > 该broker的block_end到步骤2,否则直接获取nextProducerId
  2. 如果该/latest_producer_id_block节点不存在,直接从0开始分配,选择0-999的PID段(Service 写死的分配1000个PID,无法配置,每次都要申请1000个)
  3. 如果该/latest_producer_id_block节点存在,否则读取到zk上的信息,让block_end+1再申请1000个,如果写入失败,证明其他节点可能已经更新了这个节点,重新直接步骤3即可,直到成功。

2.1.5 PID请求如何选择请求哪个broker呢?


如果没有开启事务的话,则请求发送到哪个broker是随机的。

如果开启事务的话(transaction ID!=null),则选择transaction/group Coordinator。

2.2 sequence numbers

有了PID之后,在PID+Partition级别上sequence numbers信息,就可以实现Producer的幂等性了。ProducerBatch也提供了setProducerState() 方法,它可以给一个 batch 添加一些 meta 信息(pid、baseSequence、isTransactional),这些信息是会伴随着 ProduceRequest 发到 Server 端,Server 端也正是通过这些 meta 来做相应的判断

// ProducerBatch
public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
}// MemoryRecordsBuilder
public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {if (isClosed()) {// Sequence numbers are assigned when the batch is closed while the accumulator is being drained.// If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will// be re queued. In this case, we should not attempt to set the state again, since changing the producerId and sequence// once a batch has been sent to the broker risks introducing duplicates.throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");}this.producerId = producerId;this.producerEpoch = producerEpoch;this.baseSequence = baseSequence;this.isTransactional = isTransactional;
}

2.3 客户端代码

直接从把所有批次数据进行抽取,整合发送给Service的代码开始讲解,对应方法accumulator.drain()



总结:客户端的代码很简单,就是通过topicPartitionBookkeeper维护了每一个partition的baseSeq从0开始增长,每个批次一个seq num,seq num增长并不是+1递增的,而是基于消息的个数,其实也可以理解成没一条消息都有一个seq num

2.4 服务端代码

kafkaApi的调用过程就不细讲了,这里涉及Kafka的Log存储原理解析,想更好的了解可以先去看看
,我们这里直接从Log.append()开始。


为啥是判断最近5条batch的元信息,因为在Service端是固定写死的会基于producerID维度保存最近5条batch的元信息(firstSeq、lastSeq、timestamp),如果超过5个则会先删除一个,再添加一个,这里一直维持5条batch的元信息。

2.4.1 为什么Service端是固定缓存5条批次信息?

其实原因我也不清楚
我猜测这里设置成5可能与压测结果有关,当不设置幂等性时,设置为5的效果可能更好。这其实就是在性能与有序性之间进行取舍。

缓存的条数越多,需要遍历的缓存就会越多,会降低吞吐率
缓存条数少,如果保证有序性,客户端一个连接可请求的数量就会少,客户端性能就会降低

2.4.2 max.in.flight.requests.per.connection >5的情况

max.in.flight.requests.per.connection这个参数:客户端的一个连接上允许出现未确认请求的最大数量,如果开启幂等性,这个参数设置不能超过5,如果超过5的话 会无法保证消息是幂等的。

假设max.in.flight.requests.per.connection=6,发送的请求是seq num分别是1、2、3、4、5、6,这时候server端只能存储2、3、4、5、6对应的batch数据,这时候如果请求1失败,需要进行重试,当重试请求发送到Service,在5条缓存的信息没有,就会开始检查seq num的值,如果不符合预期就会抛出OutOfOrderSequenceException异常,producer收到这个异常后会继续重试,直到达到最大重试次数(retries配置),这样不仅会影响Producer性能,还可能给Server带来不必要的压力。

3 幂等性实现流程图

3.2 服务端层面

kafka幂等性实现相关推荐

  1. Kafka幂等性与事务

    kafka幂等性 博客https://www.cnblogs.com/smartloli/p/11922639.html 幂等性: 主要解决单会话(producer宕机重启幂等性失效) 主要是引入了P ...

  2. kafka异步推送设置重试_一篇文章了解 Kafka 幂等性的原理及实践

    01 幂等性如此重要 Kafka作为分布式MQ,大量用于分布式系统中,如消息推送系统.业务平台系统(如结算平台),就拿结算来说,业务方作为上游把数据打到结算平台,如果一份数据被计算.处理了多次,产生的 ...

  3. 【Kafka】Kafka幂等性原理及实现剖析

    1.概述 转载并且补充:想看原文的请点击 https://www.cnblogs.com/smartloli/p/11922639.html 最近和一些同学交流的时候反馈说,在面试Kafka时,被问到 ...

  4. Java面试题:kafka幂等性+事务

    一面: hashmap,怎么扩容,怎么处理数据冲突?怎么高效率的实现数据迁移? Linux的共享内存如何实现,大概说了一下. Linux 中的用户模式和内核模式是什么含意? 在 Java 中 Lock ...

  5. kafka幂等性面试题,最新Java高级面试题汇

    前言 数据库相关的面试题早已成为了一线互联网大厂面试的家常菜,如果你对数据库不太熟悉,我劝你不要轻易面试大厂.那么,为什么数据库成了大厂面试的家常菜呢?主要原因当然还是海量数据. 无论对于刚入行的小白 ...

  6. 【MQ】Kafka如何保证幂等性

    文章目录 幂等性要解决的问题? Kafka 是怎么保证幂等性的? 开启幂等性配置 Kafka幂等性的局限性 事务 kafka默认情况下,提供的是至少一次的可靠性保障.即broker保障已提交的消息的发 ...

  7. kafka 丢弃数据_kafka 生产者和幂等

    Kafka优化总结 (不错) https://blog.csdn.net/zjh_746140129/article/details/88779640​blog.csdn.net Kafka面试题参考 ...

  8. 【kafka】Kafka 幂等 Producer

    1.概述 [Kafka]Kafka幂等性原理及实现剖析 [kafka]Kafka 事务性之幂等性实现 官网:Idempotent Producer 2.简介 Kafka提供了"至少一次&qu ...

  9. 一文读懂 Kafka 事务机制

    这篇文章主要讲述 Kafka 事务性相关原理,从 Kafka EOS 语义.幂等性.事务性等几个方面阐述. Kafka EOS 语义 EOS(Exactly Once Semantics,精确一次处理 ...

最新文章

  1. js函数语法:ASCII 码的相互转换,字符串操作,数学计算
  2. UA MATH565C 随机微分方程III Ito Isometry
  3. openresty开发系列15--lua基础语法4表table和运算符
  4. iOS 模糊化效果 ANBlurredImageView的使用
  5. golden gate 错误集锦
  6. 如何删除eclipse多余的工作空间
  7. GNU编译优化级别-O -O1 -O2 -O3
  8. [转载] 字符串操作截取后面的字符串_对字符串的5个必知的熊猫操作
  9. 怎么下载php源文件,设计了一个php下载当前文件,却把php源文件下载下来了,为何?...
  10. 客座编辑:崔辰州(1976-),男,博士,中国科学院国家天文台研究员、硕士生导师,国家天文台信息与计算中心主任...
  11. windows下快速创建大文件
  12. python免费教学视频400集-如何入门 Python 爬虫?400集免费教程视频带你从0-1全面掌握...
  13. mysql 去重 性能比较_mysql 去重方法distinct 与 group by 性能比较 | 学步园
  14. Python爬取中国天气网天气数据
  15. 计算机cmd管理员,cmd获取管理员权限的命令是什么
  16. vm15设置成中文界面
  17. 计算机优化英语课堂教学,多媒体课件优化英语课堂的反思性研究
  18. c语言中 c2059错误是,错误C2059:语法错误:'字符串'
  19. 虚拟机中使linux系统分辨率变大,能够在虚拟机全屏显示
  20. BGP(Border Gatreway Protcol)边界网关路由协议

热门文章

  1. GEE8:多个矢量点的NDVI连续数据的获取及分析(CSV数据)
  2. Python 单词数 统计一篇文章里不同单词的总数
  3. 联动添加redmine的wik
  4. win10+ubuntu16.04双系统 16.04无声音
  5. Java Web实训项目:西蒙购物园
  6. axure命令行_Axure RP Pro 4原创教程:(二)界面与功能
  7. 手机文件服务器app,有没有什么软件可以让手机和电脑相互传东西的?拜托各位大神?...
  8. 网络通信英文缩写-释义
  9. 人脸识别:1.DeepFace
  10. OpenAI 买下域名 AI.com,链接跳转到#ChatGPT