一、背景说明

深夜接到客户紧急电话,反馈腾讯云 kafka 中有大量消息堆积未及时消费。每分钟堆积近 100w 条数据。但是查看 ES 监控,各项指标都远还没到性能瓶颈。后天公司就要搞电商促销活动,到时候数据量是现在的至少 2 倍,这让客户很是着急。这究竟是怎么回事呢?该从何排查才能发现问题所在呢?下面我们一起还原“案发”现场。

二、客户面临问题及分析

集群使用场景:使用腾讯云 ES 集群存储业务日志数据。

集群架构:冷热架构。

集群规模:3个热节点:8C32G+4T SSD ;4个温节点:8C32G+5T高性能云盘。

数据链路:Filebeat 采集日志数据 ---> 腾讯云 kafka ----> 客户自建 logstash ----> 腾讯云 Elasticsearch

具体问题反馈:

kafka 的日常消息生产量在 260w/min。但是看 kafka 监控发现消费量只有180w/min。也就是说每分钟会堆积近 100w 条消息,积累了一段时间后,kafka 中堆积的数据量达到数亿条。

kafka 消息生产消费监控

问题分析:

经过电话沟通后,拿到了客户的 logstash 配置如下:

logstash.conf

input{kafka{bootstrap_servers => "xx.xx.xx.xx:9092"#topics => ["xx-yhtplus","xx-order","xx-user","xx-image","xx-goods","xx-activities","xx-wechat"]topics_pattern  => "xx-.*"consumer_threads => 24decorate_events => truegroup_id => "logstash"codec => "json"auto_offset_reset => "latest"
}
}
filter {mutate {convert => ["respones-time", "float"]}
}
output {elasticsearch {hosts => ["http://腾讯云VIP:9200"]user => ""password => ""index =>  "%{[@metadata][topic]}-%{+YYYY-MM-dd}"}
}

logstash.yml

pipeline.workers: 8
pipeline.output.workers: 8
pipeline.batch.size: 5000
pipeline.batch.delay: 10

经过了解发现,客户在腾讯云 tke 中启动了 8 个 logstash 进程,但是实际上只有 3 个是活跃的,另外 5 个一直处于空闲状态,且每个 logstash 进程只使用了不到 3 核的 CPU。

logstash 进程

客户反馈对 logstash.yml 配置文件做了多次调整,均不生效。

pipeline.workers 设置为 logstash 核数;

pipeline.batch.size 从 5000 到 20000 均有调整。

通过客户的配置优化反馈来看,问题应该不是出在 logstash.yml 配置的调整上,而极有可能出现在消费 kafka 的源头上。我们可以把 logstash 理解为一个水管,从kafka 上游取水,往下游 ES 中灌。既然上游的水有积压,那无非就是调大进水口或者调大出水口。既然 ES 还没到瓶颈,且 logstash.yml 相关配置无论怎么优化调整,依然没有更多的水灌到 ES 中来。那可以肯定的是问题不在出水口,而是在kafka 这侧的进水口出了点问题,即消费 kafka 的口子没有完全打开。

三、优化建议

经过和客户更细致的沟通,得到如下反馈:

  1. logstash 是统一消费一个消费组,该消费组中一共有 24 个 topic;

  2. 24 个 topic 中有 2 个 topic 数据量非常大,其他 22 个 topic 数据量一般;

  3. 每个 topic 设置为 3 个 partition。

得到如上反馈后,针对该问题,我这边给客户更进一步的优化方案如下:

1. 将 topic 进行拆分,两个大的 topic 分别单独作为一个消费组,其他的 22 个 topic 作为一个消费组,这样将一个消费组拆分出三个消费组进行消费;

2. 增大 topic 的 partition 数量,将两个大的 topic 的 partition 调整为 24,其他的22 个 topic 的 partition 调整为 8;

3. 起三组 logstash,分别消费对应的消费组;

4. 将每组 logstash 中 consumer_threads 和每组消费组的总 partition 大小设置保持一致,即保证每个 logstash 的 consumer_thread 数目* logstash 的进程数目 = kafka 对应 topic 的分区数。

起三组 logstash 消费进程

做完这些调整后,再次观察 kafka 的消费情况,已经从原来的 180w/min 提升到了520w/min。消费性能立马提升了近 3 倍。客户表示非常满意。再也不用担心两天后促销活动的消息堆积问题。

优化后的消费能力

四、问题解答

1、这个客户为什么用冷热分离的架构呢?

答:因为该客户对数据的保存时间有严格要求,即数据至少要保存两个月的时间,但是对 ES 的热节点 ssd 架构比较敏感,因此我们推荐了客户使用腾讯云 ES 的冷热分离的架构,即新索引在热节点上创建,数据保存一周后,通过 ES 提供的索引生命周期管理,自动将热节点上的数据迁移到冷节点中,冷节点使用腾讯云高性能云盘,价格相对 ssd 更加便宜。数据满 2 个月后,通过 ES 的索引生命周期自动将冷节点上的数据进行删除,以释放更多的存储空间。

2、明明设置了索引生命周期管理,但是热节点上的数据都超过一周了为什么还是没有迁移到冷节点?

答:索引的生命周期配置只会对新增的索引生效,默认对存量的索引是不生效的,这是为了防止对存量比较重要的索引造成误删除等不可逆的影响,如果需要对存量索引也生效的话,可以通过设置存量索引的 settings,关联对应的 Policy 即可。

往期推荐

3行代码能写出8个接口!同事这样做的

Java 服务性能优化,提升QPS

API 接口文档平台,多人协作太顺手了

Java方法调用的底层原理探秘

3w+字详解 23 种设计模式(多图 + 代码)

Nginx+keepalived 高可用,防盗链及动静分离配置解决方案,先收藏再看!

分布式事务七种解决方案,最后一种经典了!

回复干货】获取精选干货视频教程

回复加群】加入疑难问题攻坚交流群

回复mat】获取内存溢出问题分析详细文档教程

回复赚钱】获取用java写一个能赚钱的微信机器人

回复副业】获取程序员副业攻略一份

戳这儿

解决kafka 消息堆积问题的排查及调优相关推荐

  1. kafka 消息堆积解决

    一 :背景 线上kafka消费端因日志异常的解决导致消息堆积. 二 : 日志异常解决导致消息堆积 线上kafka消费端日志异常,频繁打印错误日志,服务器磁盘一天就满了,此时其他服务无法正常工作.报错如 ...

  2. kafka消息堆积原因解析

    kafka消息堆积,可以调节如下两个参数 max.poll.records 一次调用poll()返回的最大记录数. 默认值500 就是一次最多拉取500条记录 max.poll.interval.ms ...

  3. kafka消息堆积且CPU过高代码优化

    kafka消息堆积且CPU过高代码优化 直接部署已有的代码程序到线上服务器,发现CPU立马升高500%左右,立马停掉服务并看源代码排查问题,翻看代码,发现通过多线程消费 kafka消息,根据对多线程的 ...

  4. 平时只会用Kafka发消息,昨天突然遇到一次Kafka消息堆积生产事故!

    前言 线上kafka消息堆积,所有consumer全部掉线,到底怎么回事? 最近处理了一次线上故障,具体故障表现就是kafka某个topic消息堆积,这个topic的相关consumer全部掉线. 整 ...

  5. kafka性能测试、性能分析与性能调优

    前言:最近在做kafka.mq.redis.fink.kudu等在中间件性能压测,压测kafka的时候参考了这篇文章,大家可以借鉴下! 一.测试环境 测试使用到三台机器,机器配置如下: 共同配置: I ...

  6. 《Apache Kafka实战》读书笔记-调优Kafka集群

    <Apache Kafka实战>读书笔记-调优Kafka集群 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.确定调优目标 1>.常见的非功能性要求 一.性能( ...

  7. 技能篇:linux服务性能问题排查及jvm调优思路

    只要业务逻辑代码写正确,处理好业务状态在多线程的并发问题,很少会有调优方面的需求.最多就是在性能监控平台发现某些接口的调用耗时偏高,然后再发现某一SQL或第三方接口执行超时之类的.如果你是负责中间件或 ...

  8. kafka消息积压解决

    消息积压的解决方法 加强监控报警以及完善重新拉起任务机制,这里就不赘述了. 1.实时/消费任务挂掉导致的消费积压的解决方法 在积压数据不多和影响较小的情况下,重新启动消费任务,排查宕机原因. 如果消费 ...

  9. kafka怎么查看消息堆积_Kafka集群消息积压问题及处理策略

    阅读原文​mp.weixin.qq.com 通常情况下,企业中会采取轮询或者随机的方式,通过Kafka的producer向Kafka集群生产数据,来尽可能保证Kafk分区之间的数据是均匀分布的. 在分 ...

最新文章

  1. Android IOS WebRTC 音视频开发总结(三三)-- Periscope介绍
  2. VC6在64位Win7下调试无法退出的问题(缺少TLLOC.DLL和DM.dll)
  3. vscode更改插件路径_用好这7个 VS Code 插件,效率蹭蹭涨!
  4. linux如何找到桌面,我怎样才能找到我正在使用的桌面环境?
  5. RSA选用小公钥指数(e=3)真的不安全吗?
  6. php压制错误的代码,为什么要压制PHP错误?
  7. 一分钟带你快速进入Nacos的世界,史上最简易教程!零基础也能看明白!谁反对?
  8. Gcd HDU - 6545 (基础数论)
  9. rank()over 函数的使用
  10. 常用公有云接入——腾讯
  11. jar包中的类如何读取包内和包外的配置文件
  12. Vue脚手架组件开发常见问题
  13. java利用正则表达式弱密码检测
  14. 策略和投资组合分析-收益分析、风险回报分析和回撤分析
  15. 蝙蝠侠大战超人:正义黎明[Batman v Superman: Dawn of Justice]
  16. 苹果宣布前CEO史蒂夫·乔布斯逝世 世上再无乔布斯!
  17. 如何全备份android固件,如何一键备份安卓手机操作系统
  18. [论文笔记] Balboa: Bobbing and Weaving around Network Censorship
  19. 我教女朋友学编程Html系列(6)—Html常用表单控件
  20. 用GATK进行二代测序数据 SNP Calling 流程:(二)bwa比对和HaplotypeCaller 变异检测

热门文章

  1. (翻译)Attacking Interoperability(攻击互操作性)in Black Hat 2009 研究报告
  2. Python以及Pycharm安装、汉化详细教程
  3. 西游之路——python全栈——Django之ORM操作
  4. 云原生之路:容器技术落地最佳实践
  5. 所有大数据项目的流程是什么
  6. LeetCode(89):格雷编码 Gray Code(Java)
  7. Redis面试连环15问
  8. 使用thymeleaf实现静态化页面技术
  9. Python全栈(五)Web安全攻防之7.MySQL注入读写文件和HTTP头中的SQL注入
  10. Henrik Kniberg:乐高的基于社交的大规模敏捷计划会