原文地址:https://www.shared-code.com/article/156

公司小伙伴反馈自己负责的RocketMq集群忽然有两个队列不消费了,消息堆积达到了1万多条,这个肯定不正常。

以下是当时的消费组的实际消费情况

从上面的图中可以看出来,有两个队列严重阻塞了,好久没有上报过offset了。

通过以往自己的经验,一般来说,RocketMq部分队列消费失败,主要有以下三个原因

  1. 消费者的集群数量有问题,比如之前笔者有写过一篇文章,也是介绍RocketMq部分队列不能正常消费的,有兴趣的可以看看RocketMq 部分队列不能消费问题排查
  2. 消费者的consumer group乱用,就是明明你不消费这个topic , 但是你的消费组名称取的跟别人正常消费的一样,这个时候,就会导致一部分队列分配到你这个节点身上,但是这些节点你本身又没有去消费,就会造成消费混乱
  3. 消费者线程阻塞

上面列的三点,基本上罗列了笔者遇到的情况以及能想到的情况,欢迎大家补充!

通过逐一排查,第一点和第二点的可能性几乎没有,大家用的都还算比较规范,那就怀疑第三点了。

判断一个线程是否阻塞,最好的办法就是通过jstack命令查看整个线程栈的信息。

这里笔者使用 https://gceasy.io/ 来做线程分析

第一次拉取

第二次拉取

拉了两次线程栈信息,清楚的可以看到两次都有5个rocketMQ消费端线程处于运行状态, 这个就有很大的问题。接下来看下栈执行的代码,

ConsumeMessageThread_20
priority:5 - threadId:0x00007fcbbdca7000 - nativeId:0x3675 - nativeId (decimal):13941 - state:RUNNABLE
stackTrace:
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
at sun.security.ssl.InputRecord.read(InputRecord.java:532)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
- locked <0x00000000cceb4ab8> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000cceb6098> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x00000000cceb6070> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <0x00000000ccecbe30> (a sun.net.www.MeteredStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3454)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.xxx.utils.GaodeUtil.readFileByUrl(GaodeUtil.java:87)
at com.xxx.utils.GaodeUtil.getDirection(GaodeUtil.java:73)
at com.xxx.utils.GaodeUtil.getDistanceByLngLat(GaodeUtil.java:198)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

5个线程的栈信息几乎一致,可以看到都是堵在了 GaodeUtil.readFileByUrl 这个方法上面,最终底层是堵在了socketRead上面。

找到了业务代码就很好办了,直接去代码里面查看代码。

private String readFileByUrl(String urlString) {try {StringBuffer html = new StringBuffer();URL url = new URL(urlString);HttpURLConnection conn = (HttpURLConnection)url.openConnection();InputStreamReader isr = new InputStreamReader(conn.getInputStream(), "UTF-8");BufferedReader br = new BufferedReader(isr);String temp;while ((temp = br.readLine()) != null) {html.append(temp).append("\n");}br.close();isr.close();return html.toString();} catch (Exception e) {log.error("GaodeUtil.readFileByUrl error:", e);return null;}}

readFileByUrl 的代码如上, 这帮兔崽子写代码都不仔细思考的,这个很明显可以看出来问题,在发起http连接的时候,没有设置超时时间。

HttpURLConnection是基于HTTP协议的,其底层通过socket通信实现。如果不设置超时(timeout),在网络异常的情况下,可能会导致程序僵死而不继续往下执行

好了,问题查清楚了, 是由于消费消息处理业务的时候,发起http请求,没有设置超时时间,导致在出现网络异常的情况下,程序直接僵死。

解决办法就是 重启机器(恢复业务运行), 修改代码,增加超时时间。

欢迎关注我的公众号:sharedCode

消息队列 | RocketMq某些队列阻塞不消费相关推荐

  1. rocketmq 重复消费_消息队列 RocketMQ

    引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...

  2. rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ

    RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...

  3. rocketmq python 某个队列不消费_消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?...

    关于 消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?的搜索结果 回答 2021一月拼团已有400余人拼团成功最低一折 点击进入:一月新人专场 服务器配置时间价格1核2G1年84元1核 ...

  4. 消息队列之事务消息,RocketMQ 和 Kafka是如何做的?

    一说起事务相信大家都不陌生,脑海里蹦出来的就是 ACID. 通常我们理解的事务就是为了一些更新操作要么都成功,要么都失败,不会有中间状态的产生,而 ACID 是一个严格的事务实现的定义,不过在单体系统 ...

  5. 芋道 Spring Boot 消息队列 RocketMQ 入门

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  6. 分布式消息队列RocketMQ 快速入门

    分布式消息队列RocketMQ 一 RocketMQ概述 概述 1.MQ简介 MQ,Message Queue,是一种提供消息队列服务的中间件,是一套提供了消息生产.存储.消费全过程API的软件系统. ...

  7. 什么是消息队列 RocketMQ 版?

    消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟.高并发.高可用.高可靠的分布式消息中间件.消息队列 RocketMQ 版既可为分布式应用系统提供异步解耦和削峰 ...

  8. 消息队列RocketMQ应对双十一流量洪峰的“六大武器”

    作者:不周 审核校对:岁月.明锻 编辑&排版:雯燕 " 4982 亿,58.3 万笔/秒 "的背后 在新冠肺炎疫情催化下,数字化生活方式渐成新常态."4982 亿 ...

  9. 基于消息队列 RocketMQ 的大型分布式应用上云较佳实践

    作者|绍舒 审核&校对:岁月.佳佳 编辑&排版:雯燕 前言 消息队列是分布式互联网架构的重要基础设施,在以下场景都有着重要的应用: 应用解耦 削峰填谷 异步通知 分布式事务 大数据处理 ...

最新文章

  1. “编程能力差!90%输在这点上”谷歌AI专家:其实都是瞎努力!
  2. 一个项目在启动前都应该明确哪些事
  3. CPU+GPU异构计算完全解析
  4. 数据链路层(学习笔记)
  5. 短信广告中虚拟网关发送与电信网关发送的区别
  6. GOF之结构型模式Ⅰ(重点)
  7. sublime python调试_如何用sublime调试程序
  8. 巴克莱银行实现敏捷及DevOps与本地文化的融合
  9. 机器学习笔记(二十一):决策边界
  10. 【推荐算法】推荐系统必读论文整理
  11. 偏微分方程数值解法python_微分方程数值方法和偏微分方程有什么区别吗?
  12. Apache Log4j2远程代码执行漏洞复现
  13. KALI2021安装teemo的一些问题
  14. 实现Discord聊天机器人
  15. opening V4L
  16. 大学生期末网页大作业:基于HTML+CSS+JavaScript蓝色的汽车设备营销企业网站模板13页面
  17. 搜狗云输入法、Google手机语音搜索:两款创新云产品
  18. 由中序后序序列求前序序列
  19. windows10计算机无法启动不了,windows10不能开机怎么办
  20. Linux 内核模块及系统监控

热门文章

  1. Google Python代码风格指南
  2. 杨辉三角(超简单的思路)
  3. HPA(Horizontal Pod Autoscaler)弹性伸缩
  4. (附源码)计算机毕业设计SSM福利院管理系统
  5. 产品运营怎么做规划?
  6. Attention Is All You Need (NIPS 2017)
  7. Flutter组件-GridView-网格组件
  8. 大数据Flink电商数仓实战项目流程全解(一)
  9. 北斗授时(GPS时钟服务器)应用云计算网络系统
  10. linux查看kafka版本号