1.消息可靠性

三种丢失的情形:

1.1  生产者确认机制

启动MQ

创建Queues:

两种Callback:

1.ReturnCallback:全局callback 

2.ComfirmCallback: 发送信息时候设置

 

 @Testpublic void testSendMessage2SimpleQueue() throws InterruptedException {// 1.准备消息String message = "hello, spring amqp!";// 2.准备CorrelationData// 2.1.消息IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 2.2.准备ConfirmCallbackcorrelationData.getFuture().addCallback(result -> {// 判断结果if (result.isAck()) {// ACKlog.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());} else {// NACKlog.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());// 重发消息}}, ex -> {// 记录日志log.error("消息发送失败!", ex);// 重发消息});// 3.发送消息rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);}

执行成功:

监控页面:

模拟失败:

1.投递到交互机失败

2.投递到交换机了,但是没有进入队列

 

1.2 消息持久化

注意: 生产者确认只能保证数据放到队列当中,但是无法保证数据不丢失(比如所在的机器宕机了),
所以还需要保证数据的持久化

@Configuration
public class CommonConfig {@Beanpublic DirectExchange simpleDirect(){return new DirectExchange("simple.direct");}@Beanpublic Queue simpleQueue(){return QueueBuilder.durable("simple.queue").build();}
}
@Test
public void testDurableMessage() {// 1.准备消息 消息持久化Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 2.发送消息rabbitTemplate.convertAndSend("simple.queue", message);
}

注意:

    //交换机不传值默认就是持久化  //交换机、队列、消息默认都是持久化   @Beanpublic DirectExchange simpleDirect(){return new DirectExchange("simple.direct");}public AbstractExchange(String name) {this(name, true, false);}

演示数据是否默认持久化: 

重启mq:

 1. 交互机、队列、消息都做持久化

2.消费者端关闭防止被消费

3.重启mq后看队列中数据是否还在(是否持久化)

1.3  消费者消息确认

生产者确认:能确定消息投递到队列
消息持久化:能避免MQ宕机造成的消息丢失
生产者确认和消息持久化能保证消息能投递到消费者,但是无法保证消息被消费者消费(比如投递消费者的
同时,消费者所在机器宕机了)

1.manual:不推荐 代码侵入
try{//业务逻辑ack
} catch(ex){nack
}
2.auto:推荐 spring全权完成,不需要手动写代码
3.none:不推荐 投递完成立马删除消息,是否成功都不管
@Slf4j
@Component
public class SpringRabbitListener { @RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {log.debug("消费者接收到simple.queue的消息:【" + msg + "】");//模拟出现异常情况System.out.println(1 / 0);log.info("消费者处理消息成功!");}
}

默认为none:抛出异常后消息立即被删除:

修改为auto模式:

队列返回nack会再去发送信息:

1.4 失败重试机制

演示失败重试机制:

listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: trueinitial-interval: 1000multiplier: 3max-attempts: 4

默认重试到达最大次数后消息就丢弃:

但是对于一些比较重要不能丢弃的消息需要使用以下策略:    

推荐使用第三种方案:将失败的消息发送到失败的交换机和失败的队列中,后面可以告知管理员然后重新
人工去处理

@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorMessageBinding(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

演示:

发送消息:

 

面试题:最后一分钟的总结

2. 死信交换机

2.1  初识死信交换机

1.发送信息到消费者默认的retry重试机制,达到最大次数就会被reject
2.队列中绑定一个死信交换机,接收被reject的信息,然后发送到dl.queue
3.这样就不担心死信会丢失

对比消息失败信息处理策略:

2.2  TTL

注意: 存活时间取消息所在队列中存货时间 、消息本身存活时间的以短的时间为准

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue", durable = "true"),exchange = @Exchange(name = "dl.direct"),key = "dl"))public void listenDlQueue(String msg) {log.info("消费者接收到了dl.queue的延迟消息");}
}

@Configuration
public class TTLMessageConfig {@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000).deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");}
}

    @Testpublic void testTTLMessage() {// 1.准备消息Message message = MessageBuilder.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();// 2.发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);// 3.记录日志log.info("消息已经成功发送!");}

演示延时队列:

1.启动消费者

2.发送消息:testTTLMessage()

2.3 延迟队列

p159 27:18

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listenDelayExchange(String msg) {log.info("消费者接收到了delay.queue的延迟消息");}
}

    @Testpublic void testSendDelayMessage() throws InterruptedException {// 1.准备消息Message message = MessageBuilder.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000).build();// 2.准备CorrelationDataCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.发送消息rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);log.info("发送消息成功");}

演示延时队列:

1.启动消费者

2.运行testSendDelayMessage

报错原因:消息没有做路由

如何不报错:添加延迟的判断:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息Integer receivedDelay = message.getMessageProperties().getReceivedDelay();if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return;}// 记录日志log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}
}

高级篇-rabbitmq的高级特性相关推荐

  1. Java高级篇-----jdk1.8新特性

    目录 1.Lambda表达式 1.需求分析 2.Lambda表达式初体验 3.Lambda的语法规则 3.1.练习无参无返回值的Lambda表达式 3.2.练习有参数且有返回值的Lambda表达式 3 ...

  2. mysql高级篇(二)mysql索引优化分析

    mysql高级篇笔记 mysql高级篇(一)mysql的安装配置.架构介绍及SQL语句的复习. mysql高级篇(二)mysql索引优化分析. mysql高级篇(三)查询截取分析(慢查询日志).主从复 ...

  3. 一文带你全面解析postman工具的使用(高级篇)

    说明:由于前面的一文篇幅太大,导致无法放在一文发布,故这篇文章只是postman工具介绍的最后一部分,若回看第一部分内容:一文带你全面解析postman工具的使用(基础篇) 若回看第二部分: 一文带你 ...

  4. RabbitMQ学习笔记(高级篇)

    RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...

  5. java rabbitmq 并发_RabbitMQ消息中间件 高级篇二 高并发情况下保障消息投递可靠性...

    RabbitMQ消息中间件技术精讲9 高级篇二 高并发场景下,消息的延迟投递做二次确认进行回调检查来保障生产者消息投递成功的可靠性 在上一篇文章中,我们介绍了BAT大厂中一种方式保障生成者消息投递可靠 ...

  6. 【微服务】RabbitMQ部署高级篇

    RabbitMQ部署高级篇 1.单机部署 1.1.下载镜像 1.2.安装MQ 2.安装DelayExchange插件 2.1.下载插件 2.2.上传插件 2.3.安装插件 3.集群部署 2.1.集群分 ...

  7. 服务异步通讯(rabbitmq的高级特性)

    服务异步通讯(rabbitmq的高级特性) MQ的一些常见问题 消息可靠性问题:如何确保发送的消息至少被消费一次. 延迟消息问题:如何实现消息的延迟问题. 消息堆积问题:如何解决数百万消息堆积,无法及 ...

  8. 谷粒商城--订单服务--高级篇笔记十一

    1.页面环境搭建 1.1 静态资源导入nginx 等待付款 --------->detail 订单页 --------->list 结算页 --------->confirm 收银页 ...

  9. 谷粒商城之高级篇知识补充

    谷粒商城高级篇之知识补充 前言 本篇主要是完成谷粒商城高级篇开发时,我们需要了解并学习一部分补充的知识,才能更好的完成商城业务. 以后我们将商城任务和额外知识分开来编写,方便商城业务的连贯性. 下面是 ...

最新文章

  1. 【每日DP】day 10、P1005 矩阵取数游戏【区间DP+高精(python)】难度⭐⭐⭐★
  2. 【Python初学者】准备
  3. HNUSTOJ 1601:名字缩写
  4. 澳大利亚量子计算机获突破 首次实现简化逻辑门
  5. django框架下celery+rabbitmq+flower完成异步任务
  6. layer中嵌套的页面如何操作父页面_layui框架中layer父子页面交互详细解说
  7. JavaScript call()函数的应用
  8. 小程序点击显示隐藏(点击标题,内容显示,再次点击隐藏,同时切换箭头的状态,且默认第一组的内容显示)
  9. 浅谈实现SQL Server远距离异地容灾
  10. ceph:如何处理rados --striper上传失败的对象
  11. JNI学习-- C调用java方法
  12. 一、首页、详情页、文章编辑页制作《iVX低代码/无代码个人博客制作》
  13. 使用Stream编译出现的stream has already been operated upon or closed的问题
  14. BB8700 bowser net
  15. Linux执行命令常见的英语语句
  16. SRM 558 SurroundingGame
  17. 服务器cpu性能最大值,服务器cpu性能排行
  18. 经验谈|如何处理好产品与开发的关系
  19. maven+spring mvc+mybatis+redis+dubbo+zookeeper
  20. 中文语法纠错论文解读(一)

热门文章

  1. JavaScript走向成熟
  2. IEEE 754规格化浮点数所能表示的最大值和最小值
  3. python自动交易 缠论_学两年缠论,不如用一年时间把缠论的买卖点用通达信写出来。...
  4. leetcode -874 - 模拟行走机器人 - java版
  5. android 调用锁屏,Android反射调用goToSleep实现一键锁屏、亮屏
  6. 通用Mapper和PageHelper 快速开始
  7. 【MySQL】MySQL如何合并多行数据,行转列,group_concat 多行合并
  8. linux在终端打开dbi,Linux下安装DBI和DBD
  9. [填坑]在校大学生如何申请软件著作权
  10. 目标检测相关 Faster RCNN yolo SSD