文章目录

  • Spring Boot Kafka - 如何处理毒丸(Poison Pill)消息
    • 前言
    • 模拟毒丸(Poison Pill)消息
    • 使用ErrorHandlingDeserializer处理反序列化失败
    • 参考文档

Spring Boot Kafka - 如何处理毒丸(Poison Pill)消息

前言

在Spring Boot Kafka - 序列化和反序列化JSON 一文中描述了如何在发送消息和接收消息时作JSON序列化和反序列化。

但是上文只考虑了最乐观的情况,如何接收消息时反序列化失败,会怎么样?

如果反序列化失败,就会出现毒丸(Poison Pill)现象,Consumer会卡在“反序列化失败-重试-反序列化失败”的死循环中,无法再处理后续消息。

错误消息类似:

java.lang.IllegalStateException: **This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer**...Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition users-0 at offset 17. If needed, please seek past the record to continue consumption.

模拟毒丸(Poison Pill)消息

利用Kafka脚本来发送消息:

kafka-console-producer.sh --bootstrap-server kafka:9092 --topic users# 输入任意字符串,按回车

这时候发送的消息是未经JSON序列化的消息,所以会导致Consumer的JSON反序列化失败。

使用ErrorHandlingDeserializer处理反序列化失败

application.yaml 中配置ErrorHandlingDeserializer反序列化。

server:port: 8080
spring:kafka:consumer:bootstrap-servers: localhost:9092group-id: demo-groupauto-offset-reset: earliestkey-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializervalue-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializerproperties:spring.json.trusted.packages: '*'spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializerspring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializerproducer:bootstrap-servers: localhost:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializer

说明:

  • 将Consumer的key-deserializervalue-deserializer 都配置为 org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
  • 并委任具体的Key和Value反序列化器:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  • 在Key或Value反序列化失败时,ErrorHandlingDeserializer 确保毒丸(Poison Pill)消息被处理掉并记录日志,Consumer offeset可以向前移动,使得Consumer可以继续处理后续的消息。

参考文档

  • Spring for Apache Kafka – Beyond the Basics: Can Your Kafka Consumers Handle a Poison Pill? 对毒丸(Poison Pill)现象的背后原理和解决方法有详细的讲解。
  • Emailing microservice with Apache Kafka and Spring Boot made easy
  • Spring Kafka - Using ErrorHandlingDeserializer
  • Better way of error handling in Kafka Consumer

Spring Boot Kafka - 如何处理毒丸(Poison Pill)消息相关推荐

  1. spring boot+kafka+canal实现监听MySQL数据库

    spring boot+kafka+canal实现监听MySQL数据库 一.zookeeper安装 kafka依赖于zookeeper,安装kafka前先安装zookeeper 下载地址:Apache ...

  2. 使用Spring Boot和Project Reactor处理SQS消息-第2部分

    这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章 我在第一部分中列出了一些方法上的差距. 1.处理SQS客户端调用中的失败 2.该方法一次只能 ...

  3. spring boot 整合钉钉机器人发送消息通知

    钉钉消息通知 主要用于系统预警.资源预警.重要消息通知,随时随地可以掌握重要信息 一.通知效果 1.文本通知 2.带链接的通知 3.makrdown格式 通知 4.ActionCard 通知 5.Fe ...

  4. 【Spring Boot】Spring Boot之整合RabbitMQ并实现消息的发送和接收

    一.项目配置 1)引入maven坐标 <!--amqp--><dependency><groupId>org.springframework.boot</gr ...

  5. spring boot 自学笔记(八) Rabbitmq 延迟消息(插件)

    在前面文章有通过Rabbit的死信方式来实现延迟队列机制, 但是这种方式有极大的弊端, 机试不考虑死信队列性能问题,另外发送的消息并不能保证时间延迟的可靠性,. 举例如下: 同时发送两条延迟消息,分别 ...

  6. 使用Spring Boot和Project Reactor处理SQS消息

    我最近参与了一个项目,在该项目中,我不得不有效地处理通过AWS SQS Queue流入的大量消息. 在这篇文章(可能还有一篇)中,我将介绍使用出色的Project Reactor处理消息的方法. 以下 ...

  7. 关于Kafka Spring Boot的教程

    Apache Kafka是一个分布式数据流平台,具有发布和订阅数据流,以容错方式存储记录以及处理该数据流等功能. 它用于构建实时流数据管道,可以执行功能,例如将数据流从一个应用程序可靠地传递到另一个应 ...

  8. Kafka 入门和 Spring Boot 集成

    2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...

  9. kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka

    文章目录 1. kafka单机安装 1.1 server.properties核心配置 2. kafka集群安装 2.1 kafka集群可视化界面 3. springboot如何整合kafka 4. ...

最新文章

  1. 全网唯一一个可以复现成功的光流计算项目
  2. 组件化开发和模块化开发概念辨析
  3. 昨日-[鲁豫有约]: 乔羽
  4. 蓝桥杯java第六届决赛第二题--五星填数
  5. 过去式加ed的发音_过去式的变化规律,掌握诀窍了吗?
  6. ejb生命周期_无状态EJB:池化和生命周期
  7. Linux内核:进程上下文切换
  8. 关于 exynos 4412 按键中断 异步通知
  9. 5. http://kb.cnblogs.com/page/90838/
  10. 响应式精美列表商城卡密自动发卡源码
  11. C++ plus Primer 第六版中文版 带书签的 PDF
  12. 咸鱼ZTMR实例—PS2手柄
  13. DedeCMS总是提示验证码不正确的解决方法
  14. 如何在Excel中隐藏单元格,行和列
  15. BLE-1の蓝牙4.0协议栈概览
  16. 2022危险化学品生产单位安全生产管理人员考试题库模拟考试平台操作
  17. 2021年2月CFA考试费用是多少?
  18. 用计算机怎么调闹钟,电脑怎么设置闹钟,电脑可以设置闹钟吗?
  19. 关于Iphone3和Iphone4按键Home失效
  20. 登峰造极的人物 God Like

热门文章

  1. Python OpenCV画圆
  2. html 字体思源_css设置文字思源雅黑,分为medium, regular, light
  3. 狂神-SpringCloud笔记-总
  4. 浏览器兼容性问题整理
  5. 【SwiftUI模块】0008、SwiftUI-自定义启动闪屏动画-App启动闪屏曲线路径动画
  6. 【C语言】sizeof和strlen的区别
  7. ufvm可以读哪些网格_FVM in CFD 学习笔记_第7章_OpenFOAM和uFVM中的有限体积网格
  8. word内容无法复制
  9. CRM客户关系管理系统之数据对企业重要性
  10. mysql过滤查询结果,IF的使用