RabbitMQ之手动应答消息

1.为什么需要手动应答

当消费者完成一个任务需要一段时间,如果其中一个消费者处理一个长的任务并且只处理了部分突然他挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该条消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为他无法接收到。

2.应答分类

2.1手动应答
相应的,使用手动应答时,需要把autoAck属性设置为false,然后进行手动应答。
消息手动应答 有如下几个方法
A.Channel.basicAck(用于肯定确认)
RabbitMQ已知道该消息并且成功的处理消息, 可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与Channel.basicNack相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了
手动应答时还有一个参数:Multiple 是否批量处理,一般选择false ,不批量处理。
2.2自动应答
自动应答只需要设置这个属性为true,rabbitmq就会开启自动应答。但是使用自动应答时需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,也有可能因为没有对传递的消息数量进行限制,导致消息积压,有可能把内存耗尽。

使用手动应答的好处?

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。
手动应答生产者(Producer)代码实现:

@Slf4j
public class ExerciseT {private static final String QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.121.36");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入信息");while (scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());log.info("生产者发出消息: "+message);}}
}

消费者1代码:

@Slf4j
public class ExerciseW2 {private static final String QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.121.36");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody(), "UTF-8");log.info("W2接收到消息:" + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}//手动应答消息//1st param message tag   2.multiple: true 批量应答  false:不批量应答,实际开发中//使用falsechannel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = consumerTag -> {log.info("W2取消消息发送" + consumerTag);};channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);}
}

消费者2代码

@Slf4j
public class ExerciseW3 {private static final String QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.121.36");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);DeliverCallback deliverCallback = (consumerTag, message) -> {String msg = new String(message.getBody(), "UTF-8");log.info("W3接收到消息:" + msg);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}//手动应答消息//1st param message tag   2.multiple: true 批量应答  false:不批量应答,实际开发中//使用falsechannel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = consumerTag -> {log.info("W3取消消息发送" + consumerTag);};channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);}
}

然后把生产者代码和两个消费者都run,
刚开始生产者发送两条消息,然后两个消费者也是
轮询接收消息


然后生产者连续发送多条消息,可以看到消费者2消息了两条消息,然后关闭消费者2,剩余的消息全部被消费者1全部消费了。就证明了上面论述使用手动应答可以自动重新入队,所以不会出现消息丢失的情况。

RabbitMQ之手动应答消息(消息不丢失)相关推荐

  1. RabbitMQ之消息的自动应答、手动应答和消息持久化(Java开发)

    1.消息的自动和手动应答 boolean autoAck = true;//消息自动应答 channel.basicConsume(WQ_QUEUE,autoAck,consumer); 默认情况下, ...

  2. [RabbitMQ]消息应答概念_消息手动应答代码

    消息应答 概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况.RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为 ...

  3. Rabbitmq简单模式和消息的手动应答以及Rabbitmq持久化

    Hello模式 在idea中新建一个空工程 设置项目 添加模块 选择模块类型 设置模块 在pom文件中导入jar包依赖 书写生产者代码: public class HelloProduct {// 创 ...

  4. RabbitMQ消息手动应答生产者

    /** 消息在手动应答时是不丢失.放回队列中重新消费* */public class Task2 {// 队列名称public static final String TASK_QUEUE_NAME ...

  5. Rabbitmq手动应答代码实现与测试

    前言 rabbitmq作为现在流行的消息队列,它拥有流量削峰.应用解耦.异步处理等优点,使用数量也是较多的.其中重要的特性也就是手动应答避免消息丢失的特点更是使其更上一层楼.消息队列基础的处理流程是: ...

  6. MQ消息的自动应答和手动应答| RabbitMQ系列(三)

    相关文章 RabbitMQ系列汇总:RabbitMQ系列 前言 开始消息应答之前先思考几个问题 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会 ...

  7. RabbitMQ消息应答实战(针对自动|手动应答常见问题进行模拟)

    消息应答概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况.RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除 ...

  8. SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)

    目录 1.环境搭建 2.队列模式 3.发布订阅模式 4.路由模式 5.主题模式 6.消息手动应答机制 7.回调函数-确认机制(发布确认模式) 1.环境搭建 引入pom: <!-- rabbitM ...

  9. RabbitMQ消息手动应答消费者

    /** 消费者* */public class Worker03 {// 队列名称public static final String TASK_QUEUE_NAME = "ack_queu ...

最新文章

  1. 第1条:考虑用静态工厂方法代替构造器
  2. BZOJ 1711: [Usaco2007 Open]Dining吃饭
  3. 使用python进行数据清洗常用的库_python3常用的数据清洗方法(小结)
  4. 开源 免费 java CMS - FreeCMS1.7 栏目管理
  5. oracle 英文版安装,Oracle10gR2 on RHEL4 x86_64安装技术文档(英文原版)
  6. json中含有Unicode的处理办法 C#
  7. java中tcp传图片_Java学习之TCP上传图片
  8. 杭州intel服务器维护,服务器应该怎么去维护?
  9. 【路径规划】基于matlab RRT算法求解机器人避障路径规划问题【含Matlab源码 319期】
  10. 微信开发模式api接口文档简介
  11. 城联优品入股浩柏国际进军国际资本市场,已完成第一步
  12. Customizing Navigation Bar and Status Bar in iOS 7
  13. qq邮箱 实现邮件的发送
  14. python编程玩具有哪些_python 全栈开发,Day133(玩具与玩具之间的对话,基于jieba gensim pypinyin实现的自然语言处理,打包apk)...
  15. php打开excel文件,PHP读取Excel文件的简单示例
  16. 推荐系统笔记(MAB问题)
  17. 关于canvas画图,填充颜色,添加文字
  18. 软件项目管理系统-经费报销-出差费
  19. CSP CCF: 202112-3 登机牌条码 (C++)
  20. IDEA2020版导入tomcat的jar包到eternal libraries时遇到的问题

热门文章

  1. CVE-2022-0185 价值$3w的 File System Context 内核整数溢出漏洞利用分析
  2. 【云周刊】第124期:实时计算来临!阿里新一代实时计算引擎 Blink,每秒支持数十亿次计算
  3. 高级工程师职称计算机要求,2017年中高级工程师职称评定要求及条件
  4. 高级软考之——系统分析师思维导图(四)
  5. 异或相关知识 xor (草稿)
  6. 北京银行聚焦零售转型 多点发力助推“京彩”缤纷
  7. mac电脑卸载安装node(不同版本),安装淘宝镜像
  8. 【转贴】不理财,你的家庭能走多远!
  9. RCNN系列1:RCNN介绍
  10. linux中编辑pdf文件,Linux下PDF操作与转换