点击上方蓝色“程序猿DD”,选择“设为星标”

回复“资源”获取独家整理的学习资料!

作者 | Dong GuoChao

来源 | https://blog.dogchao.cn/?p=305

Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种。

Broker

Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于linux操作系统决定的。将数据存储到linux操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache中时,如果系统挂掉,数据会丢失。

Broker在linux服务器上高速读写以及同步到Replica

上图简述了broker写数据以及同步的一个过程。broker写数据只写到PageCache中,而pageCache位于内存。这部分数据在断电后是会丢失的。pageCache的数据通过linux的flusher程序进行刷盘。刷盘触发条件有三:

  • 主动调用sync或fsync函数

  • 可用内存低于阀值

  • dirty data时间达到阀值。dirty是pagecache的一个标识位,当有数据写入到pageCache时,pagecache被标注为dirty,数据刷盘以后,dirty标志清除。

Broker配置刷盘机制,是通过调用fsync函数接管了刷盘动作。从单个Broker来看,pageCache的数据会丢失。

Kafka没有提供同步刷盘的方式。同步刷盘在RocketMQ中有实现,实现原理是将异步刷盘的流程进行阻塞,等待响应,类似ajax的callback或者是java的future。下面是一段rocketmq的源码。

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盘

也就是说,理论上,要完全让kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况。比如,减少刷盘间隔,减少刷盘数据量大小。时间越短,性能越差,可靠性越好(尽可能可靠)。这是一个选择题。

为了解决该问题,kafka通过producer和broker协同处理单个broker丢失参数的情况。一旦producer发现broker消息丢失,即可自动进行retry。除非retry次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。那么producer是如何检测到数据丢失的呢?是通过ack机制,类似于http的三次握手的方式。

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

http://kafka.apache.org/20/documentation.html

以上的引用是kafka官方对于参数acks的解释(在老版本中,该参数是request.required.acks)。

  • acks=0,producer不等待broker的响应,效率最高,但是消息很可能会丢。

  • acks=1,leader broker收到消息后,不等待其他follower的响应,即返回ack。也可以理解为ack数为1。此时,如果follower还没有收到leader同步的消息leader就挂了,那么消息会丢失。按照上图中的例子,如果leader收到消息,成功写入PageCache后,会返回ack,此时producer认为消息发送成功。但此时,按照上图,数据还没有被同步到follower。如果此时leader断电,数据会丢失。

  • acks=-1,leader broker收到消息后,挂起,等待所有ISR列表中的follower返回结果后,再返回ack。-1等效与 all 。这种配置下,只有leader写入数据到pagecache是不会返回ack的,还需要所有的ISR返回“成功”才会触发ack。如果此时断电,producer可以知道消息没有被发送成功,将会重新发送。如果在follower收到数据以后,成功返回ack,leader断电,数据将存在于原来的follower中。在重新选举以后,新的leader会持有该部分数据。数据从leader同步到follower,需要2步:

  1. 数据从pageCache被刷盘到disk。因为只有disk中的数据才能被同步到replica。

  2. 数据同步到replica,并且replica成功将数据写入PageCache。在producer得到ack后,哪怕是所有机器都停电,数据也至少会存在于leader的磁盘内。

上面第三点提到了ISR的列表的follower,需要配合另一个参数才能更好的保证ack的有效性。ISR是Broker维护的一个“可靠的follower列表”,in-sync Replica列表,broker的配置包含一个参数:min.insync.replicas。该参数表示ISR中最少的副本数。如果不设置该值,ISR中的follower列表可能为空。此时相当于acks=1。


如上图中:

  • acks=0,总耗时f(t) = f(1)。

  • acks=1,总耗时f(t) = f(1) + f(2)。

  • acks=-1,总耗时f(t) = f(1) + max( f(A) , f(B) ) + f(2)。

性能依次递减,可靠性依次升高。

Producer

Producer丢失消息,发生在生产者客户端。

为了提升效率,减少IO,producer在发送数据时可以将多个请求进行合并后发送。被合并的请求咋发送一线缓存在本地buffer中。缓存的方式和前文提到的刷盘类似,producer可以将请求打包成“块”或者按照时间间隔,将buffer中的数据发出。通过buffer我们可以将生产者改造为异步的方式,而这可以提升我们的发送效率。

但是,buffer中的数据就是危险的。在正常情况下,客户端的异步调用可以通过callback来处理消息发送失败或者超时的情况,但是,一旦producer被非法的停止了,那么buffer中的数据将丢失,broker将无法收到该部分数据。又或者,当Producer客户端内存不够时,如果采取的策略是丢弃消息(另一种策略是block阻塞),消息也会被丢失。抑或,消息产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。

producer采取批量发送的示意图

异步发送消息生产速度过快的示意图

根据上图,可以想到几个解决的思路:

  • 异步发送消息改为同步发送消。或者service产生消息时,使用阻塞的线程池,并且线程数有一定上限。整体思路是控制消息产生速度。

  • 扩大Buffer的容量配置。这种方式可以缓解该情况的出现,但不能杜绝。

  • service不直接将消息发送到buffer(内存),而是将消息写到本地的磁盘中(数据库或者文件),由另一个(或少量)生产线程进行消息发送。相当于是在buffer和service之间又加了一层空间更加富裕的缓冲层。

Consumer

Consumer消费消息有下面几个步骤:

  1. 接收消息

  2. 处理消息

  3. 反馈“处理完毕”(commited)

Consumer的消费方式主要分为两种:

  • 自动提交offset,Automatic Offset Committing

  • 手动提交offset,Manual Offset Control

Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了。此时消息就丢失了。


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 自动提交开关
props.put("enable.auto.commit", "true");
// 自动提交的时间间隔,此处是1s
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {// 调用poll后,1000ms后,消息状态会被改为 committedConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)insertIntoDB(record); // 将消息入库,时间可能会超过1000ms
}

上面的示例是自动提交的例子。如果此时,insertIntoDB(record)发生异常,消息将会出现丢失。接下来是手动提交的例子:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 关闭自动提交,改为手动提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {// 调用poll后,不会进行auto commitConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);// 所有消息消费完毕以后,才进行commit操作consumer.commitSync();buffer.clear();}}

将提交类型改为手动以后,可以保证消息“至少被消费一次”(at least once)。但此时可能出现重复消费的情况,重复消费不属于本篇讨论范围。

上面两个例子,是直接使用Consumer的High level API,客户端对于offset等控制是透明的。也可以采用Low level API的方式,手动控制offset,也可以保证消息不丢,不过会更加复杂。

  try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 精确控制offsetconsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}

DD自研的沪牌代拍业务,点击直达

【往期推荐】

首支完全由 AI 创作的歌曲《未来之歌》发布!对于未来,你方了吗?

2020-12-16

Spring Security 实战干货:OAuth2授权回调的核心认证流程

2020-12-16

超卖 100 瓶茅台的事故分析

2020-12-15

重磅!GitHub 全部源代码被泄露?

2020-12-15

这本空降京东当当新书榜TOP1的“算法小抄”是什么来头?

2020-12-14

Redis 的 8 大数据类型,写得非常好!

2020-12-14

扫一扫,关注我

一起学习,一起进步

每周赠书,福利不断

深度内容

推荐加入

欢迎加入知识星球,一起探讨技术架构,交流技术人生。

加入方式,长按下方二维码:

已在知识星球更新如下:

素质二连,走一个

Kafka如果丢了消息,怎么处理的?相关推荐

  1. kafka发送mysql数据丢失_Kafka 如果丢了消息,怎么处理的?

    Kafka存在丢消息的问题,消息丢失会发生在Broker,Producer和Consumer三种. Broker Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞 ...

  2. java kafka分布式_Kafka分布式消息系统

    1.简介 Kafka是一个分布式消息系统,使用Scala语言进行编写,具有高水平扩展以及高吞吐量特性. 目前流行的消息队列主要有三种:ActiveMQ.RabbitMQ.Kafka ActiveMQ. ...

  3. kafka概述与下一代消息队列

    常用的消息中间件 消息中间件是当前处理大数据的一个非常重要的组件,用来解决应用解耦.异步通信.流量控制等问题,从而构建一个高效.灵活.消息同步和异步传输处理.存储转发.可伸缩和最终一致性的稳定系统.目 ...

  4. Apache Kafka:下一代分布式消息系统

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  5. 搭建高吞吐量 Kafka 分布式发布订阅消息 集群

    搭建高吞吐量 Kafka 分布式发布订阅消息 集群 简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区. ...

  6. kafka集群搭建(消息)

    1.Kafka使用背景 在我们大量使用分布式数据库.分布式计算集群的时候,是否会遇到这样的一些问题: 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位 我想对用户的搜索关键词进行 ...

  7. 阿里云消息队列Kafka商业化:支持消息无缝迁移到云上

    摘要: 7月25日,阿里云宣布正式推出消息队列Kafka,全面融合开源生态.在兼容Apache生态的基础上,阿里云消息队列Kafka彻底解决了开源产品稳定性不足的痛点,可用性达99.9%,数据可靠性9 ...

  8. Kafka入门教程其一 消息队列基本概念 及常用Producer Consumer配置详解学习笔记

    文章目录 1. 综述 2. 消息队列(Message Queue) 2.1 点对点 2.2 发布/订阅(pub-sub) 3. Kafka基础术语解释 3.1 Broker 3.2 Partition ...

  9. 微信为啥不丢“离线消息”?

    参考 需求缘起 当发送方用户A发送消息给接收方用户B时,如果用户B在线,之前的文章<微信为啥不丢"在线消息"?>聊过,可以通过应用层的确认,发送方的超时重传,接收方的去 ...

最新文章

  1. mysql隐藏用户名_系统默认的MySQL用户名消失的解决方法(修正版)
  2. Linux内核移植之三:内核配置选项
  3. android从放弃到精通 第六天 excellent
  4. RabbitMQ面试题及答案
  5. 1.1 什么是Hive
  6. java魔法堂_Java魔法堂:调用外部程序
  7. linux下查看mysql的当前连接情况
  8. ASP.NET 页生命周期
  9. 电商领袖战:马云虚,东哥实
  10. php裁剪图片白边,php生成缩略图填充白边(等比缩略图方案)_PHP
  11. 边缘设备上的实时AI人员检测:在Raspberry Pi上测试SSD模型
  12. 建立云服务器_中国云游戏元年 顺网科技跻身头号玩家队列
  13. ROS2——通信接口(十)
  14. C++网站如何实现短信验证码功能?
  15. iphone换android手机铃声,在iPhone中换个自定义铃声的11个步骤
  16. 从企业实务角度解读 ITIL4 之14个通用管理实践
  17. 磨金石教育摄影技能干货分享|优秀摄影作品欣赏——世界掠影
  18. 简单易懂的Socket TCP网络通讯知识-消息协议和数据包
  19. 悉尼大学经济学荣誉升学及就业情况情况
  20. 计算机弹奏蔡徐坤,用了多年键盘才发现,CTRL键跟蔡徐坤有关,细思极恐!

热门文章

  1. Mac OS X下Maven的安装与配置
  2. ubuntu10.04 安装virtualbox
  3. mysql top 语句简介
  4. linux 编译错误 configure: error: C++ compiler cannot create executables
  5. python 找不到ssl模块问题 no module named _ssl
  6. linux ip_conntrack 连接满导致网络丢包
  7. python3 字符串填充 清除
  8. Windows API实现窗口居中
  9. Ubuntu命令行下安装,卸载软件包的过程
  10. 进程的用户栈和内核栈