kafka 消息堆积解决
一 :背景
线上kafka消费端因日志异常的解决导致消息堆积。
二 : 日志异常解决导致消息堆积
线上kafka消费端日志异常,频繁打印错误日志,服务器磁盘一天就满了,此时其他服务无法正常工作。报错如下
java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: nullat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.lang.Thread.run(Thread.java:748)2021-05-06 17:54:38.467 ERROR 4998 --- [teParkcyy-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
最终查找到原因,是 servlet 生命周期过期,重启服务即可。
三 : 问题套娃
解决kafka消费端日志异常一天后,发现消息堆积,检查kafka消费端日志,正常刷日志,无明显异常信息 (最终发现,自己还是不仔细,日志太多了,夹杂在里面的报错信息被忽略了)
为了确认这个问题,我做了如下的测试来定位问题
1、查看kafka manager 是否有异常
管理界面大部分都看了,未发现异常情况,确认kafka 没问题
2、查看消息是否被消费到
找一个积压的topic ,对消费端的代码进行日志打印,查询消息是否被消费。
发现消息消费正常
3、查看数据库是否有锁表
查询是否因为数据库锁表或其他原因,导致消息消费了,但是没有入库,给我们的错觉是没有消费
发现数据库正常
此时发现一个重要的问题,重启kafka 之后,消息消费正常,并入库,但是到每个时间点入库停止,消费卡主,打印的消费日志停止,不在消费。
4、 再次查看kafka消费端日志
此时在消费端日志打印停止为界,向下查询日志问题,发现
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [parkBook]
日志显示parkBook topic 被拒绝访问
5、代码分析
parkBook topic 存在消费监听,此parkBook topic 应该是之前被人取消了,而我们kafka消费端对 topic 的加载是在启动时候直接加载到内存中,所以取消了并不会立马影响代码的错误。
在我们解决日志异常重启的时候,重新加载topic,导致此时内存中没有 parkBook topic ,监听失败,导致其他topic 也失败了
6、 问题解决
注释 parkBook topic 监听,重启kafka 消费端。
四 :优化
线上的kafka,是分两个Partition ,部署在一台机器上,随着数据的增加,消费能力不足以快速消费
我这边消费端是使用线程池。两倍了线程池的核心线程数、最大线程数
spring继承的kafka,配置 消费者线程为2 :spring.kafka.listener.concurrency=2
kafka 消息堆积解决相关推荐
- kafka消息堆积原因解析
kafka消息堆积,可以调节如下两个参数 max.poll.records 一次调用poll()返回的最大记录数. 默认值500 就是一次最多拉取500条记录 max.poll.interval.ms ...
- kafka消息堆积且CPU过高代码优化
kafka消息堆积且CPU过高代码优化 直接部署已有的代码程序到线上服务器,发现CPU立马升高500%左右,立马停掉服务并看源代码排查问题,翻看代码,发现通过多线程消费 kafka消息,根据对多线程的 ...
- 平时只会用Kafka发消息,昨天突然遇到一次Kafka消息堆积生产事故!
前言 线上kafka消息堆积,所有consumer全部掉线,到底怎么回事? 最近处理了一次线上故障,具体故障表现就是kafka某个topic消息堆积,这个topic的相关consumer全部掉线. 整 ...
- 解决kafka 消息堆积问题的排查及调优
一.背景说明 深夜接到客户紧急电话,反馈腾讯云 kafka 中有大量消息堆积未及时消费.每分钟堆积近 100w 条数据.但是查看 ES 监控,各项指标都远还没到性能瓶颈.后天公司就要搞电商促销活动,到 ...
- kafka消息积压解决
消息积压的解决方法 加强监控报警以及完善重新拉起任务机制,这里就不赘述了. 1.实时/消费任务挂掉导致的消费积压的解决方法 在积压数据不多和影响较小的情况下,重新启动消费任务,排查宕机原因. 如果消费 ...
- kafka怎么查看消息堆积_Kafka集群消息积压问题及处理策略
阅读原文mp.weixin.qq.com 通常情况下,企业中会采取轮询或者随机的方式,通过Kafka的producer向Kafka集群生产数据,来尽可能保证Kafk分区之间的数据是均匀分布的. 在分 ...
- 消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?...
大家好,我是 yes. 最近我一直扎在消息队列实现细节之中无法自拔,已经写了 3 篇Kafka源码分析,还剩很多没肝完.之前还存着RocketMQ源码分析还没整理.今儿暂时先跳出来盘一盘大方向上的消息 ...
- kafka消息消费有延迟_消息中间件选型分析---从Kafka与RabbitMQ的对比来看全局
有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说.对此笔者专门撰稿一篇内功心法:如何看待消息中间件的选型,不过这篇只表其意 ...
- 为什么要使用kafka消息队列
1.为什么要使用 kafka?为什么要使用消息队列? 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafk ...
最新文章
- hdu 5294 Tricks Device
- php ci laravel,PHP 框架 ci 和 laravel 的问题
- Codeforces 482E ELCA (LCT)
- 纯原生仿ES6的Object.assign,实现深度合并对象
- chrome解决跨域(CORS)问题---chrome插件
- gcc malloc/free的质疑
- 俄罗斯互联网提供商巨头Rostelecom遭遇DDoS攻击企图
- ui-router 路由重定向
- html微博分享功能,js页面文字选中后分享到新浪微博实现
- python计算复合材料层合板ABD刚度矩阵、预测层合板强度
- 如何在word中安装grammarly
- Windows PC 微信不显示头像或表情
- 使用三轴XYZ平台绘制空心字
- 指纹识别综述(5): 分类与检索
- javascript一种新的对象创建方式-Object.create()
- Windows Server 2003上搭建FTP服务器(IIS同理)
- spotify文件下载路径_从计算机的音乐文件夹中自动执行Spotify上的播放列表
- python计算机器人运动学分析_机器人学之逆运动学数值解法及SVD算法
- PageHelper 分页插件使用总结
- python拍照搜题_中国大学慕课mooc用Python玩转数据期末考试搜题公众号答案