为了保证数据安全,消费者和生产者的回执(ack)都是非常重要的。

由于我们无法保证消息都能像我们期望的那样,正常到达另一端或者被 Consumer 消费成功。因此,publisher 和 consumer 都需要一种机制,来确保消息投递成功了和消息消费成功了。

在 AMQP 0-9-1 中,消费者处理完消息后返回 acknowledgement,被称为消费者回执(Consumer Acknowledgement)。中间件收到生产者的消息后返回 acknowledgement,被称为发布确认(Publisher Confirm)。

无论是把消息从生产者投递到 RabbitMQ 节点,还是把消息从 RabbitMQ 节点投递到消费者,为了保证消息投递的可靠性和安全性,Publisher Confirm 和 Consumer Acknowledgement 都是必需的。

消费者回执

当 RabbitMQ 把消息投递给消费者,它需要知道消费者是否成功接收并处理了消息。什么样的逻辑是最优的取决于你的系统,因此,它主要是由应用程序根据实际情况来决定的。

在 AMQP 0-9-1 中,消费者回执是在调用 basic.consume 方法或 basic.get 方法时实现的。

其实,在 RabbitMQ 入门指南(二)中提到的 Message acknowledgment 就是消费者回执。

Delivery Tag

Delivery Tag 就是 Message Delivery 的标识符,它是一个 64 位的长整型值。

当消费者注册成功后,RabbitMQ 将会调用 basic.deliver 方法,该方法会携带唯一的 delivery_tag,将消息发送给消费者。

delivery_tag 的作用域是进行通信的 channel(信道),因此,消息回执必须在相同的 channel 中进行。否则,会导致 unknown delivery tag 异常。

回执模式

根据使用的回执模式(acknowledgement mode),RabbitMQ 会考虑消息是否投递成功了。

如果没有开启手动回执功能(默认关闭),消息一旦发送出去,RabbitMQ 就认为消息投递成功了。

如果开启了手动回执功能,只有当 RabbitMQ 收到了消费者手动返回的 ack 时,才会认为消息投递成功了。

将 autoAck 参数设置为 false,即表示开启手动回执功能。 这时,RabbitMQ 队列中的消息分为了两个部分:

  • 一部分是等待发送给消费者的消息(Ready);
  • 另一部分是已经发送给消费者但还未收到消费者的 ack 信号的消息(Unacked)。

手动发送的回执,可以是 positive(乐观的),也可以是 negative(悲观的)。你可以使用下面的方法来手动发送回执。

  • basic.ack 用于发送 positive ack,明确表示消费者成功地处理了消息,消息会被 RabbitMQ 删除。
  • basic.reject 用于发送 negative ack,明确表示消费者没有成功处理消息,即拒绝消息。其中,requeue 参数如果为 false,RabbitMQ 会删除该消息;如果为 true,RabbitMQ 会让该消息重新进入队列,以便重新投递给消费者。basic.reject 每次只能拒绝一条消息。
  • basic.nack 和 basic.reject 的功能类似,但 basic.nack 支持 multiple 参数,它可以一次拒绝多条消息。

手动发送回执通常会结合 prefetch(预读取)功能一起使用。

注意: 如果将 requeue 设置为 false,可以启用【死信队列】的功能。死信队列可以通过检测被拒绝或未送达的消息来追踪和处理问题。

发布确认

当生产者将消息发送出去之后,消息是否能成功地到达服务器呢?

默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是说生产者根本不知道消息有没有成功到达 RabbitMQ 服务器。那么,消息在到达服务器之前,就可能出现丢失的问题。

为了解决这个问题,RabbitMQ 提供了两种解决方案:

  • 事务机制。
  • 发布确认机制。

事务机制

与事务机制相关的方法有三个:

  • channel.txSelect 用于将当前的信道设置成事务模式,开启事务。
  • channel.txCommit 用于提交事务。
  • channel.txRollback 用于回滚事务。

在开启事务之后,便可以发布消息给 RabbitMQ,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这时我们便可以捕获异常,并执行 channel.txRollback 方法回滚事务。

注意: RabbitMQ 中的事务机制与 MySQL 中的事务概念并不相同。

事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有当消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,同时可以重发消息。

但是,使用事务机制会严重降低 RabbitMQ 的消息吞吐量。为此,RabbitMQ 提供了一个改进方案——发布确认机制(生产者确认机制)。

发布确认机制

发布确认机制(Publisher Confirm):生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会被指定一个唯一的 ID(从 1 开始),一旦消息被发送给所匹配的队列,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样,生产者便知道消息已经正确到达了 RabbitMQ 服务器。

如果消息和队列是持久化的,那么,Basic.Ack 会在消息写入磁盘之后发出。

事务机制在一条消息发送之后会阻塞发送端,需同步等待 RabbitMQ 的回应,收到回应后才能继续发送下一条消息。相比之下,发布确认机制的优点在于它可以是异步的,一旦发布一条消息,生产者就可以在等待信道返回确认的同时继续发送下一条消息,当消息得到确认之后,生产者便可以通过回调方法来处理该确认消息(Basic.Ack),如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令。

生产者通过调用 channel.confirmSelect 方法,将信道设置为confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。后续该信道中发送的消息都会被 RabbitMQ 服务器 ack 或 nack 一次。

生产者通过调用 channel.waitForConfirms 方法判断是否收到 了RabbitMQ 服务器 的 ack 或 nack。

注意事项:

  • 事务机制和发布确认机制是互斥的,不能在同一个信道中共存。
  • 事务机制和发布确认机制确保的都是把消息成功地发送给 RabbitMQ 的交换器。如果此交换器没有匹配的队列,那么消息也会丢失。发送方可以配合 mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。
  • 发布确认机制的优势在于它并不一定需要同步确认。

发布确认机制可以采用以下两种方式来提升 confirm 的效率。

  • 批量 confirm:每发送一批消息后,调用channel.waitForConfirms 方法,等待服务器的确认返回。相比普通的 confirm,批量 confirm 极大地提升了confirm 的效率,但问题在于出现 Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能或许是不升反降的。
  • 异步 confirm:提供一个回调方法,服务端确认了一条或多条消息后,客户端会回调这个方法进行处理。

异步 confirm

异步 confirm 的逻辑实现比较复杂。

Channel 中提供了 addConfirmListener 方法,可以添加 ConfirmListener 这个回调接口。

ConfirmListener 接口包含两个方法:handleAck 和 handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。

这两个方法中都包含 delivery_tag 参数(用来标记消息的唯一有序序号) 。我们需要为每一个信道维护一个 unconfirm 的消息序号集合,每发送一条消息,集合中的元素就加 1。每当调用 ConfirmListener 中的 handleAck 方法时,unconfirm 集合就删掉相应的一条(multiple 设置为 false)或者多条(multiple 设置为 true)记录。 unconfirm 集合最好采用有序集合 SortedSet 的存储结构。

RabbitMQ 消费者回执和发布确认相关推荐

  1. RabbitMQ(三)发布确认

    4.1 发布确认原理 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队 ...

  2. RabbitMQ(三)发布确认 Publisher Confirms

    代码仓库:github:https://github.com/stopping5/RabbitMq-Operation-Record.git 本代码示例需要引入rabbitmq依赖 <!-- r ...

  3. springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失. 消息正常发送的 ...

  4. 十、RabbitMQ发布确认高级

    RabbitMQ发布确认高级 发布确认SpringBoot版本 发布确认Springboot版本 简单的发布确认机制在应答与签收已经介绍,本内容将介绍整合了 SpringBoot 的发布确认机制. 介 ...

  5. RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等

    RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...

  6. RabbitMQ入门(三)消息应答与发布确认

    前言: 消息应答与发布确认都是保证消息不丢失.而重复消费问题则是消息幂等性.(之后会说幂等性) 消息应答: 应答功能属于消费者,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理 ...

  7. RabbitMQ 消费者确认(Consumer Acknowledgements)

    摘要 RabbitMQ关于数据安全有两个特性:发布者确认(Publisher Confirms)和消费者确认(Consumer Acknowledegements).前者用于MQ服务器(broker) ...

  8. RabbitMQ异步发布确认

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是 ...

  9. RabbitMQ单个发布确认

    这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布, waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候 ...

最新文章

  1. python类不支持多继承_Java和C#等不允许多继承类,但是Python是可以的
  2. Mybatis常见面试题(三)
  3. 楼燚(yì)航的blog URL
  4. 9.1 图像处理的基本概念(图像读入、图像信息查询、图像显示和图像存储)
  5. 自动驾驶——图像识别的学习笔记
  6. c#调用系统资源大集合(二)
  7. goc 介绍与源代码分析
  8. opencv android模版匹配,基于opencv模板匹配的目标检测方法
  9. Ecshop二次开发修改
  10. 服务等级协议SLA到底是在说什么?
  11. 流式处理术语解释:Exactly-once与Effectively-once
  12. bean named 'transactionManager' available: No matching PlatformTransactionManager bean found for qua
  13. html语言设置图片位置,HTML中如何设置图片位置
  14. 计算机PS属性怎么改,电脑不用ps怎么改图片分辨率 教你修改图片分辨率的方法...
  15. java 从set取值_怎样从java集合类set中取出数据?
  16. 如何找到计算机上的画图拦,机子里的画图和计算机没有了
  17. 苹果蓝牙耳机怎么样?与airpods媲美的无线耳机推荐
  18. 破解不加微信看朋友圈
  19. 使用二分法来解决的一些问题
  20. python --windos系统托盘

热门文章

  1. servicenow CSA考试 可用学习资料
  2. ruby 程序员修炼之道_面向系统管理员的Ruby
  3. javax.net.ssl.SSLException: hostname in certificate didn't match:
  4. WPF打印京东电子面单(可以异步)
  5. python验证角谷_6.4验证角谷猜想
  6. 模仿jva的逻辑写个kotlin实体类。
  7. Java中文件流的基本用法
  8. 为什么视频异常分析是最适合互联网时代的内容管理工具?
  9. PhD Debate-6 预告|深度推荐系统的探索与实践
  10. AI+医疗与生命科学技术原理