RabbitMQ 消费者回执和发布确认
为了保证数据安全,消费者和生产者的回执(ack)都是非常重要的。
由于我们无法保证消息都能像我们期望的那样,正常到达另一端或者被 Consumer 消费成功。因此,publisher 和 consumer 都需要一种机制,来确保消息投递成功了和消息消费成功了。
在 AMQP 0-9-1 中,消费者处理完消息后返回 acknowledgement,被称为消费者回执(Consumer Acknowledgement)。中间件收到生产者的消息后返回 acknowledgement,被称为发布确认(Publisher Confirm)。
无论是把消息从生产者投递到 RabbitMQ 节点,还是把消息从 RabbitMQ 节点投递到消费者,为了保证消息投递的可靠性和安全性,Publisher Confirm 和 Consumer Acknowledgement 都是必需的。
消费者回执
当 RabbitMQ 把消息投递给消费者,它需要知道消费者是否成功接收并处理了消息。什么样的逻辑是最优的取决于你的系统,因此,它主要是由应用程序根据实际情况来决定的。
在 AMQP 0-9-1 中,消费者回执是在调用 basic.consume 方法或 basic.get 方法时实现的。
其实,在 RabbitMQ 入门指南(二)中提到的 Message acknowledgment 就是消费者回执。
Delivery Tag
Delivery Tag 就是 Message Delivery 的标识符,它是一个 64 位的长整型值。
当消费者注册成功后,RabbitMQ 将会调用 basic.deliver 方法,该方法会携带唯一的 delivery_tag,将消息发送给消费者。
delivery_tag 的作用域是进行通信的 channel(信道),因此,消息回执必须在相同的 channel 中进行。否则,会导致 unknown delivery tag
异常。
回执模式
根据使用的回执模式(acknowledgement mode),RabbitMQ 会考虑消息是否投递成功了。
如果没有开启手动回执功能(默认关闭),消息一旦发送出去,RabbitMQ 就认为消息投递成功了。
如果开启了手动回执功能,只有当 RabbitMQ 收到了消费者手动返回的 ack 时,才会认为消息投递成功了。
将 autoAck 参数设置为 false,即表示开启手动回执功能。 这时,RabbitMQ 队列中的消息分为了两个部分:
- 一部分是等待发送给消费者的消息(Ready);
- 另一部分是已经发送给消费者但还未收到消费者的 ack 信号的消息(Unacked)。
手动发送的回执,可以是 positive(乐观的),也可以是 negative(悲观的)。你可以使用下面的方法来手动发送回执。
- basic.ack 用于发送 positive ack,明确表示消费者成功地处理了消息,消息会被 RabbitMQ 删除。
- basic.reject 用于发送 negative ack,明确表示消费者没有成功处理消息,即拒绝消息。其中,requeue 参数如果为 false,RabbitMQ 会删除该消息;如果为 true,RabbitMQ 会让该消息重新进入队列,以便重新投递给消费者。basic.reject 每次只能拒绝一条消息。
- basic.nack 和 basic.reject 的功能类似,但 basic.nack 支持 multiple 参数,它可以一次拒绝多条消息。
手动发送回执通常会结合 prefetch(预读取)功能一起使用。
注意: 如果将 requeue 设置为 false,可以启用【死信队列】的功能。死信队列可以通过检测被拒绝或未送达的消息来追踪和处理问题。
发布确认
当生产者将消息发送出去之后,消息是否能成功地到达服务器呢?
默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是说生产者根本不知道消息有没有成功到达 RabbitMQ 服务器。那么,消息在到达服务器之前,就可能出现丢失的问题。
为了解决这个问题,RabbitMQ 提供了两种解决方案:
- 事务机制。
- 发布确认机制。
事务机制
与事务机制相关的方法有三个:
- channel.txSelect 用于将当前的信道设置成事务模式,开启事务。
- channel.txCommit 用于提交事务。
- channel.txRollback 用于回滚事务。
在开启事务之后,便可以发布消息给 RabbitMQ,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这时我们便可以捕获异常,并执行 channel.txRollback 方法回滚事务。
注意: RabbitMQ 中的事务机制与 MySQL 中的事务概念并不相同。
事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有当消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,同时可以重发消息。
但是,使用事务机制会严重降低 RabbitMQ 的消息吞吐量。为此,RabbitMQ 提供了一个改进方案——发布确认机制(生产者确认机制)。
发布确认机制
发布确认机制(Publisher Confirm):生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会被指定一个唯一的 ID(从 1 开始),一旦消息被发送给所匹配的队列,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样,生产者便知道消息已经正确到达了 RabbitMQ 服务器。
如果消息和队列是持久化的,那么,Basic.Ack 会在消息写入磁盘之后发出。
事务机制在一条消息发送之后会阻塞发送端,需同步等待 RabbitMQ 的回应,收到回应后才能继续发送下一条消息。相比之下,发布确认机制的优点在于它可以是异步的,一旦发布一条消息,生产者就可以在等待信道返回确认的同时继续发送下一条消息,当消息得到确认之后,生产者便可以通过回调方法来处理该确认消息(Basic.Ack),如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令。
生产者通过调用 channel.confirmSelect 方法,将信道设置为confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。后续该信道中发送的消息都会被 RabbitMQ 服务器 ack 或 nack 一次。
生产者通过调用 channel.waitForConfirms 方法判断是否收到 了RabbitMQ 服务器 的 ack 或 nack。
注意事项:
- 事务机制和发布确认机制是互斥的,不能在同一个信道中共存。
- 事务机制和发布确认机制确保的都是把消息成功地发送给 RabbitMQ 的交换器。如果此交换器没有匹配的队列,那么消息也会丢失。发送方可以配合 mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。
- 发布确认机制的优势在于它并不一定需要同步确认。
发布确认机制可以采用以下两种方式来提升 confirm 的效率。
- 批量 confirm:每发送一批消息后,调用channel.waitForConfirms 方法,等待服务器的确认返回。相比普通的 confirm,批量 confirm 极大地提升了confirm 的效率,但问题在于出现 Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能或许是不升反降的。
- 异步 confirm:提供一个回调方法,服务端确认了一条或多条消息后,客户端会回调这个方法进行处理。
异步 confirm
异步 confirm 的逻辑实现比较复杂。
Channel 中提供了 addConfirmListener 方法,可以添加 ConfirmListener 这个回调接口。
ConfirmListener 接口包含两个方法:handleAck 和 handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。
这两个方法中都包含 delivery_tag 参数(用来标记消息的唯一有序序号) 。我们需要为每一个信道维护一个 unconfirm 的消息序号集合,每发送一条消息,集合中的元素就加 1。每当调用 ConfirmListener 中的 handleAck 方法时,unconfirm 集合就删掉相应的一条(multiple 设置为 false)或者多条(multiple 设置为 true)记录。 unconfirm 集合最好采用有序集合 SortedSet 的存储结构。
RabbitMQ 消费者回执和发布确认相关推荐
- RabbitMQ(三)发布确认
4.1 发布确认原理 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队 ...
- RabbitMQ(三)发布确认 Publisher Confirms
代码仓库:github:https://github.com/stopping5/RabbitMq-Operation-Record.git 本代码示例需要引入rabbitmq依赖 <!-- r ...
- springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失
目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失. 消息正常发送的 ...
- 十、RabbitMQ发布确认高级
RabbitMQ发布确认高级 发布确认SpringBoot版本 发布确认Springboot版本 简单的发布确认机制在应答与签收已经介绍,本内容将介绍整合了 SpringBoot 的发布确认机制. 介 ...
- RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等
RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...
- RabbitMQ入门(三)消息应答与发布确认
前言: 消息应答与发布确认都是保证消息不丢失.而重复消费问题则是消息幂等性.(之后会说幂等性) 消息应答: 应答功能属于消费者,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理 ...
- RabbitMQ 消费者确认(Consumer Acknowledgements)
摘要 RabbitMQ关于数据安全有两个特性:发布者确认(Publisher Confirms)和消费者确认(Consumer Acknowledegements).前者用于MQ服务器(broker) ...
- RabbitMQ异步发布确认
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是 ...
- RabbitMQ单个发布确认
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布, waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候 ...
最新文章
- python类不支持多继承_Java和C#等不允许多继承类,但是Python是可以的
- Mybatis常见面试题(三)
- 楼燚(yì)航的blog URL
- 9.1 图像处理的基本概念(图像读入、图像信息查询、图像显示和图像存储)
- 自动驾驶——图像识别的学习笔记
- c#调用系统资源大集合(二)
- goc 介绍与源代码分析
- opencv android模版匹配,基于opencv模板匹配的目标检测方法
- Ecshop二次开发修改
- 服务等级协议SLA到底是在说什么?
- 流式处理术语解释:Exactly-once与Effectively-once
- bean named 'transactionManager' available: No matching PlatformTransactionManager bean found for qua
- html语言设置图片位置,HTML中如何设置图片位置
- 计算机PS属性怎么改,电脑不用ps怎么改图片分辨率 教你修改图片分辨率的方法...
- java 从set取值_怎样从java集合类set中取出数据?
- 如何找到计算机上的画图拦,机子里的画图和计算机没有了
- 苹果蓝牙耳机怎么样?与airpods媲美的无线耳机推荐
- 破解不加微信看朋友圈
- 使用二分法来解决的一些问题
- python --windos系统托盘
热门文章
- servicenow CSA考试 可用学习资料
- ruby 程序员修炼之道_面向系统管理员的Ruby
- javax.net.ssl.SSLException: hostname in certificate didn't match:
- WPF打印京东电子面单(可以异步)
- python验证角谷_6.4验证角谷猜想
- 模仿jva的逻辑写个kotlin实体类。
- Java中文件流的基本用法
- 为什么视频异常分析是最适合互联网时代的内容管理工具?
- PhD Debate-6 预告|深度推荐系统的探索与实践
- AI+医疗与生命科学技术原理