首先:
1:kafka是拉取模式的消息队列,是消费者控制什么时候拉取消息的;
2:每条消息都有一个偏移量,每个消费者都会跟踪最近消费消息的偏移量;

当消费者消费某条消息失败时?

有下面几种处理方式:

1:重试,不停的重试,直到成功;
可能导致的问题:
问题是若是这条消息(通过目前的代码)可能永远不能消费成功,
导致消费者不会继续处理后续的任何问题,导致消费者阻塞;

2:跳过,跳过这条没有消费的消息;
这个其实是不合理的,可能会造成数据不一致性;

解决方案?

1:方案一【重试】

这里需要建立一个专门用于重试的topic(retry topic);
当消费者没有正确消费这条消息的时候,
则将这条消息转发(发布)到重试主题(retry topic)上,然后提交消息的偏移量,以便继续处理下一个消息;
注意:
这个时候,这个没有正确消费的消息,其实对于这个消费者来说,也算是消费完成了,因为也正常提交了偏移量,只不过是业务没有正确处理,而且这个消息被发布到另一个topic中了(retry topic);
;;
之后再创建一个重试消费者,用于订阅这个重试主题,
只不过这个重试消费者,跟之前那个消费者处理相同的业务,两个逻辑是一样的;
;;
如果这个重试消费者,也无法小得这条消息,那就把这个消息发布到另一个重试主题上,并提交该消息的偏移量;
;;
循环,递归
;;
最后,当创建了很多重试消费者的时候,在最终重试消费者无法处理某条消息后,把该消息发布到一个死信队列(DLQ);

这种方式优缺点:
优点:可以重试,可能在重试中,就正常消费了;
缺点:可能一直重试,都不会正常消费,一直到死信队列中;
这个问题就比较大了,可能我们就是消费的是一个错误的消息,
比如说缺字段,或者数据库中根本就没有这个业务的id;
或者消息中包含特殊字段,导致无法消费;

这种是消息本身的错误,导致无法消费,除非你去解决消息,不然是永远不会成功的;

建议:
对于第一种:
消息正确,是因为业务或者其他问题导致第一次消费失败,其实可以让消费者自己去重试,知道消费成功;
对于第二种:
当然可以用上面那个重试主题,不然你这个消息阻塞了我后面的后续的消费,所以需要把这个消息分流,分流该消息会为我们的消费清除障碍;

重试主题的缺点:

1:它会忽略排序
当事件发布到同一分区时,可以保证各个事件按照它们发生的顺序进行处理。如果对同一聚合进行连续更改,并且所产生的事件发布到不同的分区,就可能发生争用状况,也就是消费者在消费第一个更改之前就消费了第二个更改。这会导致数据不一致。

重试主题真正出问题的地方。它让我们的消费者容易打乱处理事件的顺序。

重试主题什么时候可用:

需要明确的是,重试主题并非一直都是错误的模式。当然,它也存在一些合适的用例。具体来说,当消费者的工作是收集不可修改的记录时,这种模式就很不错。这样的例子可能包括:

统计日志啊
记录用户操作记录啊

这类消费者可能会从重试主题模式中受益,同时没有数据损坏的风险。

重试主题怎么优化:

1:消除错误类型;
如果我们能在生产者发送该消息的时候,就确定这个消息是正常的,可以被消费的,那么就没必要用重试主题了,直接让消费者重试就好了;
解决方案:
在消息的body中增加一个字段:
isRetry:是否重试
那么我们消费者拿到这个消息后,可以根据这个字段来判断生产者是否需要我们重试,
当然这个是最简单的一种;
还有根据错误类型来判断的:

void processMessage(KafkaMessage km) {try {Message m = km.getMessage();transformAndSave(m);} catch (Throwable t) {if (isRecoverable(t)) {// ...} else {// ...}}
}

在上面的 Java 伪代码示例中,isRecoverable()将采用一种白名单方法来确定 t 是否表示可恢复错误。换句话说,它检查 t 以确定它是否与任何已知的可恢复错误(例如 SQL 连接错误或 ReST 客户端超时)相匹配,如果匹配则返回 true,否则返回 false。这样就能防止我们的消费者被不可恢复错误一直阻塞下去。

当然这种其实也不太准,
例如,一个 SQLException 可能指的是一次数据库故障(可恢复)或一次约束违反状况(不可恢复)。

2:在消费者内重试可恢复错误

可恢复错误:就是在重试一段时间后,可能成功的;
不可恢复错误:你直接就是消息体错误,缺字段,结构错误,有特殊字符等;

存在可恢复错误时,将消息发布到重试主题毫无意义。我们只会为下一条消息的失败扫清道路。相反,消费者可以简单地重试,直到条件恢复。

当然,出现可恢复错误意味着外部资源存在问题。我们不断对这块资源发送请求是无济于事的。因此,我们希望对重试应用一个退避策略。我们的伪 Java 代码现在可能看起来像这样:

void processMessage(KafkaMessage km) {try {Message m = km.getMessage();transformAndSave(m);} catch (Throwable t) {if (isRecoverable(t)) {doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);} else {// ...}}
}

(注意:我们使用的任何退避机制都应配置为在达到某个阈值时向我们发出警报,并通知我们潜在的严重错误)

2:遇到不可恢复错误时,将消息直接发送到最后一个主题

另一方面,当我们的消费者遇到不可恢复错误时,我们可能希望立即隐藏(stash)该消息,以释放后续消息。但在这里使用多个重试主题会有用吗?答案是否定的。在转到 DLQ 之前,我们的消息只会经历 n 次消费失败而已。那么,为什么不从一开始就将消息粘贴在那里呢?

与重试主题一样,这个主题(在这里,我们将其称为隐藏主题)将拥有自己的消费者,其与主消费者保持一致。但就像 DLQ 一样,这个消费者并不总是在消费消息;它只有在我们明确需要时才会这么做。

总结:

不论我们是直接用消费者的重试,还是发送到重试主题,没有最好的解决方案,只有最适合自己的,需要根据自己的实际业务来选择最合适的处理方式,
最好能加人工告警机制,【代码不够,人工来凑】,这个也是我开发中经常用来背锅的一种思路;

kafka-整理-重试机制相关推荐

  1. 兄弟!kafka的重试机制,你可能用错了~

    点击上方蓝色"方志朋",选择"设为星标"回复"666"获取独家整理的学习资料! Apache Kafka 已成为跨微服务异步通信的主流平台. ...

  2. 恕我直言:你可能一直用错了 kafka 的重试机制

    Apache Kafka 已成为跨微服务异步通信的主流平台.它有很多强大的特性,让我们能够构建健壮.有弹性的异步架构. 同时,我们在使用它的过程中也需要小心很多潜在的陷阱.如果未能提前发现可能发生(换 ...

  3. 你可能用错了 kafka 的重试机制

    点击上方"码农突围",马上关注 这里是码农充电第一站,回复"666",获取一份专属大礼包 真爱,请设置"星标"或点个"在看 来源 ...

  4. 11. kafka重试机制解读

    前面对kafka的学习中已经了解到KafkaProducer通过设定参数retries,如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数. 本片文 ...

  5. loadingcache 有重试机制吗_重试机制的实现

    服务在请求资源,如果遇到网络异常等情况,导致请求失败,这时需要有个重试机制来继续请求. 常见的做法是重试3次,并随机 sleep 几秒. 业务开发的脚手架,HTTP Client 基本会封装好 ret ...

  6. 面试官问:为什么MySQL的索引不采用Kafka的索引机制

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 第一眼看到这个问题,也是很迷惑的,谁没事会问这种问题.然而,事实上 ...

  7. invalid signature 错误原因验签失败_Nginx 失败重试机制

    可直接点击上方蓝字 (网易游戏运维平台) 关注我们,获一手游戏运维方案 src 网易游戏 SRE,喜欢钻研与分享. 背景 Nginx 作为目前应用较广的反向代理服务,原生提供了一套失败重试机制,来保证 ...

  8. 一文详细解析kafka重平衡机制

    前言 1.队列重平衡概述 如果对RocketMQ或者对消息中间件有所了解的话,消费端在进行消息消费时至少需要先进行队列(分区)的负载,即一个消费组内的多个消费者如何对订阅的主题中的队列进行负载均衡,当 ...

  9. 2021年大数据Kafka(十二):❤️Kafka配额限速机制❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka配额限速机制 限制producer端的速率 限制c ...

  10. springboot 整合retry(重试机制)

    当我们调用一个接口可能由于网络等原因造成第一次失败,再去尝试就成功了,这就是重试机制,spring支持重试机制,并且在Spring Cloud中可以与Hystaix结合使用,可以避免访问到已经不正常的 ...

最新文章

  1. MySQL快速入门指南
  2. python没基础可以学吗-没编程基础可以学python吗
  3. ucore操作系统实验笔记 - Lab1
  4. 使用 JS 关闭警告框及监听自定义事件(amaze ui)
  5. python global 变量_python 全局变量和局部变量 (例子)
  6. HDU5528 - Count a * b
  7. java oca_OCA第1部分中的Java难题
  8. Linux内核RCU(Read Copy Update)锁简析
  9. 那些年我们追过的计算机经典书
  10. 格力又有新专利了:“一种铁芯冲片、电机及新能源汽车”
  11. Pandas 中的这 3 个函数,没想到竟成了我数据处理的主力
  12. Flutter入门一——W7环境下使用VSCode配置Flutter开发环境(脱离Android Studio安装)...
  13. 2017cad光标大小怎么调_关于调整input里面的输入光标大小
  14. 分布式系统的概念、特点及常见方案
  15. 未来不迎,当下不杂,过往不恋
  16. vue3 + uni-app 封装音乐播放插件
  17. Git命令全解析-前端备忘录
  18. 年轻人应不应该买房 如何买
  19. txfont与\texttt冲突问题
  20. Python基础 | Spyder的使用

热门文章

  1. DirectDraw高彩模式编程入门
  2. 【解决SQL Server sa连接服务器失败】用户‘sa‘登录失败。已成功与服务器建立连接,但是在登录过程中发生错误
  3. html5经纬度定位 源码_基于浏览器的HTML5地理定位
  4. 你知道吗?炒鞋的那帮人,现在去炒数字藏品了
  5. Compound原理
  6. lua的使用(摘自luachina)
  7. 小孔成像中四个坐标系转换
  8. java就业率高吗_java好就业吗
  9. AOJ-AHU-OJ-6 Hero in Maze
  10. CD网站用户消费行为的分析报告