消息队列 | RocketMq某些队列阻塞不消费
原文地址:https://www.shared-code.com/article/156
公司小伙伴反馈自己负责的RocketMq集群忽然有两个队列不消费了,消息堆积达到了1万多条,这个肯定不正常。
以下是当时的消费组的实际消费情况
从上面的图中可以看出来,有两个队列严重阻塞了,好久没有上报过offset了。
通过以往自己的经验,一般来说,RocketMq部分队列消费失败,主要有以下三个原因
- 消费者的集群数量有问题,比如之前笔者有写过一篇文章,也是介绍RocketMq部分队列不能正常消费的,有兴趣的可以看看RocketMq 部分队列不能消费问题排查
- 消费者的consumer group乱用,就是明明你不消费这个topic , 但是你的消费组名称取的跟别人正常消费的一样,这个时候,就会导致一部分队列分配到你这个节点身上,但是这些节点你本身又没有去消费,就会造成消费混乱
- 消费者线程阻塞
上面列的三点,基本上罗列了笔者遇到的情况以及能想到的情况,欢迎大家补充!
通过逐一排查,第一点和第二点的可能性几乎没有,大家用的都还算比较规范,那就怀疑第三点了。
判断一个线程是否阻塞,最好的办法就是通过jstack
命令查看整个线程栈的信息。
这里笔者使用 https://gceasy.io/ 来做线程分析
第一次拉取
第二次拉取
拉了两次线程栈信息,清楚的可以看到两次都有5个rocketMQ消费端线程处于运行状态, 这个就有很大的问题。接下来看下栈执行的代码,
ConsumeMessageThread_20
priority:5 - threadId:0x00007fcbbdca7000 - nativeId:0x3675 - nativeId (decimal):13941 - state:RUNNABLE
stackTrace:
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
at sun.security.ssl.InputRecord.read(InputRecord.java:532)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
- locked <0x00000000cceb4ab8> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000cceb6098> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x00000000cceb6070> (a java.io.BufferedInputStream)
at sun.net.www.MeteredStream.read(MeteredStream.java:134)
- locked <0x00000000ccecbe30> (a sun.net.www.MeteredStream)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3454)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x00000000ccecab68> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.xxx.utils.GaodeUtil.readFileByUrl(GaodeUtil.java:87)
at com.xxx.utils.GaodeUtil.getDirection(GaodeUtil.java:73)
at com.xxx.utils.GaodeUtil.getDistanceByLngLat(GaodeUtil.java:198)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
5个线程的栈信息几乎一致,可以看到都是堵在了 GaodeUtil.readFileByUrl
这个方法上面,最终底层是堵在了socketRead上面。
找到了业务代码就很好办了,直接去代码里面查看代码。
private String readFileByUrl(String urlString) {try {StringBuffer html = new StringBuffer();URL url = new URL(urlString);HttpURLConnection conn = (HttpURLConnection)url.openConnection();InputStreamReader isr = new InputStreamReader(conn.getInputStream(), "UTF-8");BufferedReader br = new BufferedReader(isr);String temp;while ((temp = br.readLine()) != null) {html.append(temp).append("\n");}br.close();isr.close();return html.toString();} catch (Exception e) {log.error("GaodeUtil.readFileByUrl error:", e);return null;}}
readFileByUrl
的代码如上, 这帮兔崽子写代码都不仔细思考的,这个很明显可以看出来问题,在发起http连接的时候,没有设置超时时间。
HttpURLConnection是基于HTTP协议的,其底层通过socket通信实现。如果不设置超时(timeout),在网络异常的情况下,可能会导致程序僵死而不继续往下执行
好了,问题查清楚了, 是由于消费消息处理业务的时候,发起http请求,没有设置超时时间,导致在出现网络异常的情况下,程序直接僵死。
解决办法就是 重启机器(恢复业务运行), 修改代码,增加超时时间。
欢迎关注我的公众号:sharedCode
消息队列 | RocketMq某些队列阻塞不消费相关推荐
- rocketmq 重复消费_消息队列 RocketMQ
引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...
- rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...
- rocketmq python 某个队列不消费_消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?...
关于 消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?的搜索结果 回答 2021一月拼团已有400余人拼团成功最低一折 点击进入:一月新人专场 服务器配置时间价格1核2G1年84元1核 ...
- 消息队列之事务消息,RocketMQ 和 Kafka是如何做的?
一说起事务相信大家都不陌生,脑海里蹦出来的就是 ACID. 通常我们理解的事务就是为了一些更新操作要么都成功,要么都失败,不会有中间状态的产生,而 ACID 是一个严格的事务实现的定义,不过在单体系统 ...
- 芋道 Spring Boot 消息队列 RocketMQ 入门
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
- 分布式消息队列RocketMQ 快速入门
分布式消息队列RocketMQ 一 RocketMQ概述 概述 1.MQ简介 MQ,Message Queue,是一种提供消息队列服务的中间件,是一套提供了消息生产.存储.消费全过程API的软件系统. ...
- 什么是消息队列 RocketMQ 版?
消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟.高并发.高可用.高可靠的分布式消息中间件.消息队列 RocketMQ 版既可为分布式应用系统提供异步解耦和削峰 ...
- 消息队列RocketMQ应对双十一流量洪峰的“六大武器”
作者:不周 审核校对:岁月.明锻 编辑&排版:雯燕 " 4982 亿,58.3 万笔/秒 "的背后 在新冠肺炎疫情催化下,数字化生活方式渐成新常态."4982 亿 ...
- 基于消息队列 RocketMQ 的大型分布式应用上云较佳实践
作者|绍舒 审核&校对:岁月.佳佳 编辑&排版:雯燕 前言 消息队列是分布式互联网架构的重要基础设施,在以下场景都有着重要的应用: 应用解耦 削峰填谷 异步通知 分布式事务 大数据处理 ...
最新文章
- “编程能力差!90%输在这点上”谷歌AI专家:其实都是瞎努力!
- 一个项目在启动前都应该明确哪些事
- CPU+GPU异构计算完全解析
- 数据链路层(学习笔记)
- 短信广告中虚拟网关发送与电信网关发送的区别
- GOF之结构型模式Ⅰ(重点)
- sublime python调试_如何用sublime调试程序
- 巴克莱银行实现敏捷及DevOps与本地文化的融合
- 机器学习笔记(二十一):决策边界
- 【推荐算法】推荐系统必读论文整理
- 偏微分方程数值解法python_微分方程数值方法和偏微分方程有什么区别吗?
- Apache Log4j2远程代码执行漏洞复现
- KALI2021安装teemo的一些问题
- 实现Discord聊天机器人
- opening V4L
- 大学生期末网页大作业:基于HTML+CSS+JavaScript蓝色的汽车设备营销企业网站模板13页面
- 搜狗云输入法、Google手机语音搜索:两款创新云产品
- 由中序后序序列求前序序列
- windows10计算机无法启动不了,windows10不能开机怎么办
- Linux 内核模块及系统监控