消费者

  • 接收模式
  • 监听
  • 确认
  • 取消确认
  • 确认超时
  • 死信主题
  • Retry letter topic(重试)

consumer是通过订阅附加到topic,然后接收消息的过程。

Consumer 向 broker 发送消息流获取申请(flow permit request)以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 你能够通过receiverQueueSize参数配置队列的长度 (队列的默认长度是1000) 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。

接收模式

可以通过同步(sync) 或者异步(async)的方式从brokers接受消息。

发送模式 说明
同步接收 同步模式,在收到消息之前都是被阻塞的。
异步接收 异步接收模式会立即返回一个 future 值(如 Java 中的 CompletableFuture),一旦收到新的消息就立刻完成。

监听

客户端库为使用者提供侦听器实现。例如,Java客户端提供了一个MessageListener接口。在这个接口中,一旦接受到新的消息,received方法将被调用。

确认

消费者在成功消费一个消息后,向 Broker 发送一个确认请求。 然后,这条被消费的消息将被永久保存,只有在所有订阅者都确认后才会被删除。 如果希望消息被消费者确认后仍然保留下来,可配置消息保留策略实现。

对于批处理消息,你可以启用批处理索引确认,以避免将确认的消息分派给消费者。 关于批量索引确认的细节,请参见batching。

消息可以通过以下两种方式之一进行确认。

  • 被单独承认。通过单独的确认,消费者确认每条消息,并向broker发送确认请求。
  • 累积确认模式。通过累积确认,消费者只确认其收到的最后一条消息,所有之前(包含此条)的消息,都不会被再次发送给那个消费者。

如果你想单独确认消息,你可以使用以下API。

consumer.acknowledge(msg);

如果你想累计确认消息,你可以使用以下API。

consumer.acknowledgeCumulative(msg);

Note
在共享订阅模式中不能使用累积确认,因为共享订阅模式涉及多个可以访问同一订阅的使用者。在共享订阅模式下,消息会被单独确认。

取消确认

当一个消费者未能消费一个消息并打算再次消费它时,这个消费者应该向 Broker 发送一个否定的确认。 然后,Broker 将把这个消息重新传递给消费者。

根据消费订阅模式,单独或累积地对消息进行负面确认。

在独占和故障转移订阅模式下,使用者只会消极地确认他们收到的最后一条消息。

在共享和密钥共享订阅模式中,消费者可以单独否定地确认消息。

请注意,订单订阅类型为否定, 比如Exclusive,Failover和Key_Shared之类的消息可能会导致发送失败的消息以不符合原始顺序的方式到达使用者。

如果你想否定地确认信息,你可以使用以下API。

//调用这个api,消息会被否定地确认。
consumer.negativeAcknowledge(msg);

Note
如果启用了批处理,则一批中的所有消息都将重新传递给使用者。

确认超时

如果一个消息没有被成功消费,而你想让 Broker 自动重新交付这个消息,那么你可以为未被认可的消息启用自动重新交付机制。 在启用自动重新交付的情况下,客户端跟踪整个acktimeout时间范围内的未确认的消息,并在指定确认超时时向代理发送重新交付未确认的消息请求。

Note
如果启用了批处理,则一批中的所有消息都将重新传递给使用者。
否定确认优于确认超时,因为否定确认更精确地控制单个消息的重新传递,并在消息处理时间超过确认超时时避免无效的重新传递。

死信主题

当消费者无法成功消费某些messages时,死信主题使您能够消费新messages。在这种机制中,无法使用的消息存储在单独的主题中,称为死信主题。您可以决定如何处理死信主题中的消息。

以下示例显示了如何使用默认死信主题在Java客户端中启用死信topic:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()).subscribe();

默认死信主题使用以下格式:

<topicname>-<subscriptionname>-DLQ

如果要指定死信主题的名称,请使用以下Java客户端示例:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic("your-topic-name").build()).subscribe();

死信主题依赖消息重试由于确认超时或否定确认,消息被重新传递。如果要对消息使用否定确认,请确保在确认超时之前对其进行否定确认。

Note
目前,死信主题在共享和密钥共享订阅模式下启用。

Retry letter topic(重试)

很多在线的业务系统,由于业务逻辑处理出现异常,消息一般需要被重新消费。 若需要允许延时重新消费失败的消息,你可以配置生产者同时发送消息到业务主题和重试主题,并允许消费者自动重试消费。 配置了允许消费者自动重试。如果消息没有被消费成功,它将被保存到重试主题当中。并在指定延时时间后,自动重新消费重试主题里面的消费失败消息。

默认情况下,自动重试被禁用。您可以将enableRetry设置为true,以启用对使用者的自动重试。

如下例子所示,消费者会从重试主题消费消息。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).enableRetry(true).receiverQueueSize(100).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry").build()).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

如果您想要将消息放入重试队列,您可以使用以下API。

consumer.reconsumeLater(msg,3,TimeUnit.SECONDS);

Pulsar Messaging(二)相关推荐

  1. Pulsar Messaging(一)

    Messaging 简介 消息 Producers 发送模式 访问模式 Compression(压缩) 批量处理 分块 简介 Pulsar 采用 发布-订阅的设计模式(简称 pub-sub),在这种模 ...

  2. 为什么要选择Apache Pulsar(二)

    这是介绍Apache Pulsar关键特性系列文章的第二篇.Pulsar是由Yahoo开发并开源的下一代发布订阅消息系统.在第一篇文章里,我们介绍了Pulsar对消息模型的灵活支持.多租户.多地域复制 ...

  3. 5、mysql->pulsar->mysql

    本文实现mysql数据通过pulsar消息队列,异步传输到mysql库表的全过程代码实现,包括pulsar环境搭建,代码运行效果展示.全部使用Flink cdc最新版本实现,虽然代码精简,但确属全网最 ...

  4. 关于Pulsar与Kafka

    在本系列的Pulsar和Kafka比较文章中,我将引导您完成我认为重要的几个领域,并且对于人们选择强大,高可用性,高性能的流式消息传递平台至关重要.消息传递模型(Messaging model)是用户 ...

  5. pulsar分析以及各消息队列对比

    rabbitmq rabbitmq的消息可靠性 rabbitmq-幂等引出的性能分析 rocketmq 从rabbitmq到rocketmq kafka 从rocketmq到kafka:集群.一致性与 ...

  6. 即将直播 | Pulsar Summit Asia 2022 峰会最新议程

    关于 Pulsar Summit Pulsar Summit 是 Apache Pulsar 社区年度盛会,它将分布在世界各地的 Apache Pulsar 项目 Contributor.Commit ...

  7. 议题预告 | Pulsar Summit Asia 2022:Day 2 - 英文演讲

    关于 Pulsar Summit Pulsar Summit 是 Apache Pulsar 社区年度盛会,它将分布在世界各地的 Apache Pulsar 项目 Contributor.Commit ...

  8. 社区盛会 | Pulsar Summit Asia 2022 议程全览

    关于 Pulsar Summit Pulsar Summit 是 Apache Pulsar 社区年度盛会,它将分布在世界各地的 Apache Pulsar 项目 Contributor.Commit ...

  9. 大数据系列之:安装pulsar详细步骤

    大数据系列之:安装pulsar详细步骤 一.Pulsar版本和jdk对应关系 二.安装JDK 三.设置和激活jdk环境变量 四.下载和解压Pulsar 五.查看Pulsar目录 六.启动Pulsar ...

最新文章

  1. 斯坦福大学新研究:声波、光波等都是RNN
  2. chrome添加来自其他网站的扩展程序
  3. sscanf的高级用法
  4. Reading and Writing to Binary Files
  5. TextView IME option
  6. 向量化计算cell_吴恩达老师课程笔记系列第24节-Octave教程之向量化和作业(6)
  7. AIO 开始不定时的抛异常: java.io.IOException: 指定的网络名不再可用
  8. 使用ArcGIS实现WGS84经纬度坐标到北京54高斯投影坐标的转换
  9. oracle卸载重新安装失败,Oracle卸载重新安装——实战
  10. Microsoft Edge浏览器设置编码方式
  11. python实现携程网站爬取
  12. 正则表达式验证生日手机号信息
  13. VS2015下解决:error LNK2019: 无法解析的外部符号 __iob_func
  14. Apple可以改善下一代Apple Watch的4种心率变异性数据的方法
  15. OO,OO以后,及其极限(转)
  16. matplotlib切换主题风格
  17. 硬件工程师需要学习那些本领
  18. 广告标识符用途的选择
  19. Redis是什么?看完就知道了
  20. 狗狗家面试题:第 k 大元素(快速排序法)

热门文章

  1. MUSBMHDRC USB 2.0 编程指南解读4
  2. raid磁盘阵列常见故障类型和解决方案
  3. 数据压缩实验六 MPEG音频压缩编码
  4. 二叉树层次遍历算法 python_二叉树的遍历详解:前、中、后、层次遍历(Python实现)...
  5. 大数据对小微商户线上开店的应用
  6. 专利授权具有哪些好处?
  7. Python爬虫实战(一) ---- 制作自己的桌面天气小工具
  8. AIDE的使用规则配置
  9. #import C:\Program Files\CommonFiles\System\ado\msado15.dll no_namespace rename(EOF,adoEOF)问题
  10. springboot项目使用swagger时拦截器需要放开哪些URL