概述

事故经过

由于大量商户反应收不到推送,第一反应是不是推送系统挂了,导致没有进行推送。于是让运维检查推送系统个节点的情况,发现都正常。于是打开RabbitMQ的管控台看了一下,人都蒙了。已经有几万条消息处于ready状态,还有几百条unacked的消息。
以为推送服务和MQ连接断开了,导致无法推送消息,于是让运维重启推送服务,将所有的推送服务重启完,发现unacked的消息全部变成ready,但是没过多久又有几百条unacked的消息了,这个就很明显了能消费,没有进行ack呀。
当时以为是网络问题,导致MQ无法接收到ack,让运维检查了一下,发现网络没问题。现在看真的是傻,网络有问题连接都连不上。由于确定的是无法ack造成的,立马将ack模式由原来的manual改成auto紧急发布。将所有的节点升级好以后,发现推送正常了。
你以为这就结束了其实没有,没过多久发现有一台MQ服务出现异常,由于生产采用了镜像队列,立即将这台有问题的MQ从集群中移除了。直接进行重置,然后加回集群。这事情算是告一段落了。此时已经接近24:00了。
时间来到第二天上午10:00,运维那边又出现报警了,说推送系统有台机器,磁盘快被写满了,并且占用率很高。

事故重现-队列阻塞

MQ配置

spring:# 消息队列rabbitmq:host: 10.0.0.53username: guestpassword: guestvirtual-host: localport: 5672# 消息发送确认publisher-confirm-type: correlated# 开启发送失败退回publisher-returns: truelistener:simple:# 消费端最小并发数concurrency: 1# 消费端最大并发数max-concurrency: 5# 一次请求中预处理的消息数量prefetch: 2# 手动应答acknowledge-mode: manual

问题代码

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,  @Headers Map<String,Object> headers, Channel channel)  throws Exception {// 解密和解析String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);try {// 模拟推送pushMsg(orderDto);}catch (Exception e){log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));} finally {// 消息签收channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);}
}

看起来好像没啥问题。由于和交易系统约定好,订单数据需要先转换json串,然后再使用AES进行加密,所以这边需要,先进行解密然后在进行解析。才能得到订单数据。
为了防止消息丢失,交易系统做了失败重发机制,防止消息丢失,不巧的是重发的时候没有对订单数据进行加密。这就导致推送系统,在解密的时候出异常,从而无法进行ack。

模拟推送

# 发送3条正常的消息
curl http://localhost:8080/sendMsg/3
# 发送1条错误的消息
curl http://localhost:8080/sendErrorMsg/1
# 再发送3条正常的消息
curl http://localhost:8080/sendMsg/3


观察日志发现,虽然有报错,但是还能正常进行推送。但是RabbitMQ已经出现了一条unacked的消息。

继续发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

这个时候你会发现控制台报错,当然错误信息是解密失败,但是正常的消息却没有被消费,这个时候其实队列已经阻塞了。


从RabbitMQ管控台也可以看到,刚刚发送的3条消息处于ready状态。这个时候如果一直有消息进入,都会堆积在队列里面无法被消费。
再发送3条正常的消息。

curl http://localhost:8080/sendMsg/3

原因分析

上面说了是由于没有进行ack导致队列阻塞。那么问题来了,这是为什么呢?其实这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。
RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认的消息前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置Prefetch数量来实现。
举例说明:可以理解为在consumer前面加一个缓冲器,容器能容纳最大的消息量就是prefetch count。如果容器没有满RabbitMQ就会将投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。

listener:simple:# 消费端最小并发数concurrency: 1# 消费端最大并发数max-concurrency: 5# 一次处理的消息数量prefetch: 2# 手动应答acknowledge-mode: manual

prefetch参数就是Prefetch Count。

通过上面的配置发现prefetch我只配置了2,并且concurrency配置的只有1,所以当我发送了2条错误消息以后,由于解密失败这两条消息一直没有被ack。将缓冲区占满了,这个时候RabbitMQ认为这个consumer已经没有消费能力了就不继续给他推送消息了,所以就造成了队列阻塞。

判断队列是否有阻塞风险

当ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。
channelCount就是由concurrency,max-concurrency决定的。

  • min = concurrency * prefetch * 节点数量。
  • max = max-concurrency * prefetch * 节点数量。
    由此可以得出结论。
  • unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
  • unacked_msg_count >= min 可能会出现阻塞。
  • unacked_msg_count >= max 队列一定阻塞。

解决方案

其实处理的方法很简单,将解密和解析的方法放入try catch中就解决了这样不管解密正常与否,消息都会被签收。如果出错将会输出错误日志,让开发人员进行处理了。
对于这个就需要有日志监控系统,来及时告警了。

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception {try {// 解密和解析String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);   // 模拟推送pushMsg(orderDto);}catch (Exception e){log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);}finally {// 消息签收channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);}
}

注意点

unacked的消息在consumer 切断连接后(比如重启),会自动回到对头。

事故重现-磁盘占用飙升

一开始不知道代码有问题,就是以为单纯的没有进行ack所以将ack模式改成auto自动(acknowledge-mode默认是auto),紧急升级了,这样不管正常与否,消息都会被签收,所以在当时确实是解决了问题。
其实现在回想起来是非常危险的操作,将ack模式改成auto自动,这样会使QOS不生效。会出现大量消息涌入consumer从而造成consumer宕机,可以是因为当时在晚上,交易比较少,并且推送系统有多个节点,才没出问题。

问题代码

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto, @Headers Map<String,Object> headers, Channel channel) throws Exception {// 解密和解析String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);try {// 模拟推送pushMsg(orderDto);}catch (Exception e){log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);}finally {// 消息签收channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);}
}

配置文件

listener:simple:# 消费端最小并发数concurrency: 1# 消费端最大并发数max-concurrency: 5# 一次处理的消息数量prefetch: 2# 手动应答acknowledge-mode: auto

由于当时不知道交易系统的重发机制,重发时没有对订单数据加密的bug,所以还是会发出少量有误的消息。

发送一条错误消息

curl http://localhost:8080/sendErrorMsg/1


原因

RabbitMQ消息监听程序异常时,consumer会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递。就相当于死循环了,所以控制台在疯狂刷错误日志造成磁盘利用率飙升的原因。

解决方法

将default-requeue-rejected: false 即可。

参考

生产RabbitMQ队列阻塞该如何处理?

RabbitMQ队列阻塞该如何处理相关推荐

  1. Day10-Python3基础-协程、异步IO、redis缓存、rabbitMQ队列

    内容目录: Gevent协程 Select\Poll\Epoll异步IO与事件驱动 Python连接Mysql数据库操作 RabbitMQ队列 Redis\Memcached缓存 Paramiko S ...

  2. Python开发【十一章】:RabbitMQ队列

    RabbitMQ队列 rabbitMQ是消息队列:想想之前的我们学过队列queue:threading queue(线程queue,多个线程之间进行数据交互).进程queue(父进程与子进程进行交互或 ...

  3. golang gin 监听rabbitmq队列无限消费

    golang gin 监听rabbitmq队列无限消费 连接rabbitmq package databaseimport ("github.com/streadway/amqp" ...

  4. python rabitmq_python RabbitMQ队列使用

    原博文 2019-01-17 21:17 − python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queu ...

  5. [RabbitMQ]队列持久化

    RabbitMQ持久化 概念 如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失.默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做. ...

  6. RabbitMq队列 queue

    目录 RabbitMq队列 消息确认机制 负载均衡 生产者代码 消费者1 消费者2 RabbitMq队列 在上篇文章中讲了mq的队列,这篇用代码实现.在例子中存在一个生产者,和两个消费者.生产者将生产 ...

  7. 队列阻塞_Java并发|阻塞队列ArrayBlockingQueue解析

    之前的文章我们学了 ConcurrentHashMap. ConcurrentLinkedQueue 等线程安全容器,而且也说了 Java并发包中的 Concurent 开头的并发容器都是非阻塞的,是 ...

  8. rabbitmq队列中消息过期配置

    最近公司某个行情推送的rabbitmq服务器由于客户端异常导致rabbitmq队列中消息快速堆积,还曾导致过内存积压导致rabbitmq客户端被block的情况.考虑到行情信息从业务上来说可以丢失部分 ...

  9. 【日常问题】记录一次UAT环境消息队列阻塞问题

    2022-7-27,测试人员反馈:运单揽收无法生成运单,查看了一下是因为队列阻塞了,原因是:自己在Rabbit客户端发送消息,没有考虑到消费端接受消息的condition,导致消息一直处于Unack状 ...

最新文章

  1. 生信分析-本地BLAST
  2. 混合云存储开启企业上云新路径--阿里云混合云备份容灾方案发布 1
  3. java枚举使用示例
  4. Linux基础-查看文件与目录
  5. linux kvm虚拟机配置及常见问题处理
  6. 算法 - 二分查找(非递归实现二分查找)
  7. Win11桌面没有图标怎么解决 Win11桌面没有图标解决教程
  8. 利用好手头的资源解决海量语料资源收集以及利用哈工大的LTP云平台解决依存句法和语义依存分析
  9. 从初创型到独角兽企业,监控架构演进的那些事儿
  10. 总奖金高达180万元 蚂蚁金服启动区块链创新大赛
  11. Java中如何使用匿名内部类?
  12. 面经——操作系统(linux为例)
  13. matlab coefs,MATLAB小波分析工具箱常用函数
  14. 飞机大战h5微信小游戏代码
  15. 爬取沪江网考研词汇并按要求存为txt
  16. 文件系统--open系统调用详解
  17. CTF网络安全大赛学习笔记1010
  18. 条件运算符(三目运算符)
  19. Java数据类型之Java数据类型的划分方式
  20. 1、 网络营销常见单词缩写

热门文章

  1. oracle系统视图更新,Oracle内联视图更新遇到的问题
  2. rc4加密算法 php,php 实现RC4加密解密
  3. 第三方(秒嘀)短信验证码登陆 demo
  4. wap端开发必须基础
  5. Win10安装Cygwin,并安装GCC等软件包
  6. 推荐一个去除图片人物背景的在线网站 Removebg
  7. 解决vue-cli默认使用yarn或者npm的问题
  8. IDEA告警详解:Optional.isPresent can be replaced with functional-style expression
  9. 【Android】开源项目汇总-备用 各种图形的绘制,各种效果
  10. FILETIME或时间戳转换日期