全局记录RabbitMQ的消费者消息日志
还是为了方便不同环境的问题排查,需要记录 消费者收到的所有消息,最好也能记录一下每个消息的处理时长,哈哈。
注:本文的完整代码,已经上传到:Github代码
通过分析springframework.amqp代码,发现
RabbitListener注解的消费者,是通过 SimpleMessageListenerContainer 类在处理监听:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
而 SimpleMessageListenerContainer 类是由 SimpleRabbitListenerContainerFactory 工厂类创建的:
org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
SimpleRabbitListenerContainerFactory 工厂类的父类提供了一个 setAdviceChain 可以添加一些拦截器。
OK,我们就是在这里增加一个拦截器,以记录日志。
为不对项目造成影响,我们不重新定义bean,选择在已有的Bean上设置Advice,关键代码:
@Configuration
public class RabbitConfiguration implements BeanPostProcessor {@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {switch (beanName) {// (bean instanceof SimpleRabbitListenerContainerFactory) 如果项目未引用mq,这里会报错:NoClassDefFoundErrorcase "rabbitListenerContainerFactory":// 修改原有Bean,避免new SimpleRabbitListenerContainerFactory 出现问题SimpleRabbitListenerContainerFactory factory = (SimpleRabbitListenerContainerFactory) bean;Advice myAdvice = new RabbitAdvice();Advice[] adviceList = factory.getAdviceChain();if (adviceList == null || adviceList.length <= 0) {adviceList = new Advice[]{myAdvice};} else {adviceList = Arrays.copyOf(adviceList, adviceList.length + 1);adviceList[adviceList.length - 1] = myAdvice;}factory.setAdviceChain(adviceList);
下面是 RabbitAdvice 关键代码:
@Slf4j
public class RabbitAdvice implements MethodInterceptor {@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {long start = System.currentTimeMillis();try {return invocation.proceed();} finally {long cost = System.currentTimeMillis() - start;StringBuilder sb = new StringBuilder("收到消息, 处理耗时:");sb.append(cost).append("ms\r\n");try {Object[] args = invocation.getArguments();for (Object obj : args) {sb.append(obj).append("\r\n");if (obj instanceof Message) {sb.append("body: ").append(new String(((Message) obj).getBody())).append("\r\n");}}log.debug(sb.toString());} catch (Exception exp) {sb.append("Exception: ").append(exp.getMessage()).append("\r\n");log.error(sb.toString());}}}
}
OK,你的消费者代码该怎么定义,就怎么定义:
@RabbitListener(queues = Producer.QUEUE)
void handler(@Payload Dto dto, @Headers Map<String, Object> headers) {System.out.println("我收到消息了:" + dto);
}
当收到消息时,日志里会出现如下格式的日志:
2020-11-05 11:21:20.604 DEBUG 16412 --- [ntContainer#0-1] b.c.demolograbbitmq.rabbit.RabbitAdvice : 收到消息, 处理耗时:53ms
Cached Rabbit Channel: AMQChannel(amqp://admin@10.2.3.250:5672/,1), conn: Proxy@3676ac27 Shared Rabbit Connection: SimpleConnection@5c77053b [delegate=amqp://admin@10.2.3.250:5672/, localPort= 56004]
(Body:'{"clas":"beinet.cn.demolograbbitmq.DemoLogRabbitmqApplication$$EnhancerBySpringCGLIB$$57eda434","method":null,"para":null,"result":null,"costTime":2147483647,"remark":"这是测试消息","exp":null}' MessageProperties [headers={aaa=bbb, __TypeId__=beinet.cn.demolograbbitmq.util.Dto}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=001.ybl, deliveryTag=1, consumerTag=amq.ctag-RX1jaLWzHpqhPkG9Ma6O7w, consumerQueue=001.ybl])
body: {"clas":"beinet.cn.demolograbbitmq.DemoLogRabbitmqApplication$$EnhancerBySpringCGLIB$$57eda434","method":null,"para":null,"result":null,"costTime":2147483647,"remark":"这是测试消息","exp":null}
全局记录RabbitMQ的消费者消息日志相关推荐
- rabbitmq 查看消费者_RabbitMQ 和 Kafka 的比较
导言 作为一个有丰富经验的微服务系统架构师,经常有人问我,"应该选择RabbitMQ还是Kafka?".基于某些原因, 许多开发者会把这两种技术当做等价的来看待.的确,在一些案例场 ...
- rabbitmq 查看消费者_(Windows环境下)RabbitMQ系列(一)安装以及入门使用
一.RabbitMQ介绍 RabbitMQ是一个消息代理:它接受和转发消息.你可以把它想象成一个邮局.在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员. RabbitMQ和邮局的主要区别在于它不 ...
- rabbitmq怎样确认是否已经消费了消息_【朝夕专刊】RabbitMQ生产者/消费者消息确认...
欢迎大家阅读<朝夕Net社区技术专刊> 我们致力于.NetCore的推广和落地,为更好的帮助大家学习,方便分享干货,特创此刊!很高兴你能成为忠实读者,文末福利不要错过哦! 上篇文章介绍了R ...
- rabbitmq python 消费者_菜鸟世界 -RabbitMQ---消费者示例
1.消费者的作用 消费者是勤勤恳恳的劳动者,它一直等待RabbitMQ给它分配任务,加入有100个任务,这些任务是由一个或是多个生产者生产出来的,现在,这些任务都放在RabbitMQ里. 消费者启动后 ...
- RabbitMQ多消费者消息分配
一. 轮询分配 当有多个消费者同时监听一个队列时,RabbitMQ默认将消息逐一顺序分配给各消费者,该消息分配机制称为轮询(Round-Robin). 为验证该机制,建立两个消费者,同时监听同 ...
- RabbitMQ:消费者ACK机制、生产者消息确认
文章目录 基础案例环境搭建: 环境: 1. 生产者发送消息确认 1.1 confirm 确认模式 1.2 return 退回模式 源代码 1.1.3 小结 2. 消费者签收消息(ACK) 2.1 代码 ...
- RabbitMQ的消费者处理消息失败后之重试3次,重试3次仍然失败发送到死信队列。
1.为什么要重试? 如果消费者处理消息失败后不重试,然后发送应答给rabbitmq,rabbitmq就会将队列中的消息删除,从而造成消息的丢失.所以我们要在消费者处理消息失败的时候,重试一定的次数.比 ...
- RabbitMQ:消费者和生产者。
如果你曾经在工作中使用过网络软件,脑海中应该会有客户端和服务器端的概念.不管是浏览器和Web服务器,还是应用程序和MySQL服务器,都是其中一方发送请求,而另一方服务这些请求.你可以将其视为快餐车模式 ...
- Springboot整合 rabbitMQ 异步写消息日志
1.配置属性文件,包括连接客户端 配置并发数 2.配置队列名.交换机名.路由关键字信息 3.配置RabbitMqconfig中配置工厂模式,分为两种单一和多消费者模式 4.从配置文件中读取配置,实例队 ...
最新文章
- [2010国家集训队]Crash的旅游计划
- 阿里云中间件首席架构师李小平:云原生实践助力企业高效创新
- BarTender操作遇到OLE DB遇到了错误0x80004005”的问题
- Linux CentOS 7 安装 JAVA(jdk-8u181-linux-x64)
- 工具解析:杀毒引擎惨遭打脸,黑帽大会爆惊天免杀工具
- vue2.0click点击事件修饰符stop阻止单击事件冒泡prevent阻止默认事件
- ACM竞赛入门,从零开始
- MCU远程升级方案,可解决升级错误死机问题
- 数学建模常用模型22:回归模型
- 3月24日服务器维护公告,梦幻西游3月24日更新了什么_梦幻西游3月24日维护及系统调整公告_游戏堡...
- 【NOI2015】小园丁与老司机
- Unity TimeLine丨A1.创建TimeLine、Animation Track,Extrapolation属性讲解
- ZooKeeper【基础知识 02】zookeeper-3.6.0 常用Shell命令(节点增删改查+监听器+四字指令)
- LeGo-LOAM激光雷达定位算法源码阅读(二)
- PowerSensorAI 3 从现有模型迁移训练 - 五花分类
- MP4学习(五)ts-mp4源码阅读(3)ftyp box的解析
- E.A. Guillemin 吉耶曼统一古典电路理论的人
- 网络安全学习--域(一)
- post json数据
- 100集华为HCIE安全培训视频教材整理 | 双向NAT技术