云栖君导读:本文节选自云栖社区系列丛书《RocketMQ原理与实战解析》,作者:阿里巴巴数据专家杨开元。本节将重点讲解RocketMQ不同类型的消费者。

根据使用者对读取操作的控制情况,分为两种类型。一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。

1.1.1 DefaultMQPushConsumer的使用

使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。下面结合org.apache.rocketmq.example.quickstart包中的源码来介绍。

代码清单1-1   DefaultMQPushConsumer示例

DefaultMQPushConsumer需要设置三个参数:一是这个Consumer的GroupName,二是NameServer的地址和端口号,三是Topic的名称,下面详细介绍。

Consumer的GroupName用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式(MessageModel)配合使用。

RocketMQ支持两种消息模式:Clustering 和 Broadcasting。

  • 在Clustering 模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。

  • 在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。

NameServer的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port;ip3:port”。

Topic名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息(Tag是在发送消息时设置的标签)。在填写Tag参数的位置,用null或者“*”表示要消费这个Topic的所有消息。

1.1.2 DefaultMQPushConsumer的处理流程

本节通过分析源码来说明DefaultMQPushConsumer的处理流程。

DefaultMQPushConsumer主要功能实现在DefaultMQPushConsumerImpl类中,消息的处理逻辑是在pullMessage这个函数里的PullCallBack中。在PullCallBack函数里有个switch语句,根据从Broker返回的消息类型做相应的处理,具体处理逻辑可以查看源码。

代码清单1-2    DefaultMQPushConsuer的处理逻辑

DefaultMQPushConsuer的源码中有很多PullRequest语句,比如DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest),为什么“PushConsumer”中使用“PullRequest”呢?这是通过“长轮询”方式达到Push效果的方法,长轮询方式既有Pull的优点,又兼具Push方式的实时性。

Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端;首先是加大Server端的工作量,进而影响Server的性能,其次Client的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。

Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来有可能没有被及时处理。

“长轮询”方式是通过Client端和Server端的配合,既拥有Pull的优点,又能达到保证实时性的目的。我们结合源码来分析:

代码清单1- 3  发送Pull消息代码片段

源码中有这一行设置语句

requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis),设置Broker最长阻塞时间,默认设置是15秒,注意是Broker在没有新消息的时候才阻塞,有消息会立刻返回。

从Broker的源码中可以看出,服务端接到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次 waitForRunning一段时候(默认是5秒),然后后再Check。默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过Request里面的 BrokerSuspendMaxTimeMillis,就返回空结果。在等待的过程中,Broker收到了新的消息后会直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。

长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。

1.1.3 DefaultMQPullConsumer

使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一样需要设置各种参数,写处理消息的函数,同时还需要做额外的事情。接下来结合org.apache.rocketmq.example.simple包中的例子源码来介绍。

示例代码的处理逻辑是逐个读取某Topic下所有Message Queue的内容,读完一遍后退出,主要处理额外的三件事情:

(1) 获取Message Queue并遍历

一个Topic包括多个Message Queue,如果这个Consumer需要获取Topic下所有的消息,就要遍历多有的Message Queue。如果有特殊情况,也可以选择某些特定的Message Queue来读取消息。

(2) 维护Offsetstore

从一个Message Queue里拉取消息的时候,要传入Offset参数(long类型的值),随着不断读取消息,Offset会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。

(3) 根据不同的消息状态做不同的处理

拉取消息的请求发出后,会返回:FOUND,NO_MATCHED_MSG,NO_NEW_MSG,OFFSET_ILLEGAL四种状态,要根据每个状态做不同的处理。比较重要的两个状态是FOUNT和NO_NEW_MSG,分别表示获取到消息和没有新的消息

实际情况中可以把while(true)放到外层,达到无限循环的目的。因为PullConsumer需要用户自己处理遍历Message Queue、保存Offset,所以PullConsumer有更多的自主性和灵活性。

----------------

本文节选自云栖社区系列丛书《RocketMQ原理与实战解析》,点击左下角【阅读原文】进入赢取图书!

end

阿里90后工程师利用ARM硬件特性开启安卓8终端“上帝模式”

API管理工具Swagger介绍及Springfox原理分析

国内首家!阿里云宣布全面提供IPv6服务

机器学习和数据科学领域必读的10本免费书籍

从构建分布式秒杀系统聊聊限流特技

更多精彩

详解RocketMQ不同类型的消费者相关推荐

  1. rocketmq 消息指定_详解RocketMQ不同类型的消费者

    原标题:详解RocketMQ不同类型的消费者 云栖君导读:本文节选自云栖社区系列丛书<RocketMQ原理与实战解析>,作者:阿里巴巴数据专家杨开元.本节将重点讲解RocketMQ不同类型 ...

  2. 从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘

    本文转载自:从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘 事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务.本文对Roc ...

  3. Apollo进阶课程㉙丨Apollo控制技术详解——控制器的类型

    原文链接:进阶课程㉙丨Apollo控制技术详解--控制器的类型 控制主要是为了弥补数学模型和物理世界执行之间的不一致性.对于自动驾驶而言,规划的轨迹和车辆的实际运行轨迹并不完全一致,控制器按照规划轨迹 ...

  4. 详解JavaScript变量类型判断及domReady原理 写得很好

    原文:详解JavaScript变量类型判断及domReady原理 我们知道,在开发JavaScript时候,经常要判断JavaScript变量类型,此 JavaScript教程 详细介绍JS变量的判断 ...

  5. mybatis 鉴别其_MyBatis之Mapper XML 文件详解(四)-JDBC 类型和嵌套查询

    MyBatis之Mapper XML 文件详解(四)-JDBC 类型和嵌套查询 白玉 IT哈哈 支持的 JDBC 类型 为了未来的参考,MyBatis 通过包含的 jdbcType 枚举型,支持下面的 ...

  6. python中byte类型_详解python string类型 bytes类型 bytearray类型

    搜索热词 一.python3对文本和二进制数据做了区分.文本是Unicode编码,str类型,用于显示.二进制类型是bytes类型,用于存储和传输.bytes是byte的序列,而str是unicode ...

  7. 详解三大专利类型之二:实用新型专利

      大家好,我是英子老师.作为一名知识产权专家,深耕于专利行业十余年,具有丰富的专利工作经验:曾在大型专利代理机构从事专利代理工作.专利质检工作(抽查代理机构的专利代理人的撰写质量并评分):之后在知名 ...

  8. 详解三大专利类型之三:外观设计专利

      大家好,我是英子老师.作为一名知识产权专家,深耕于专利行业十余年,具有丰富的专利工作经验:曾在大型专利代理机构从事专利代理工作.专利质检工作(抽查代理机构的专利代理人的撰写质量并评分):之后在知名 ...

  9. 消息中间件系列(九):详解RocketMQ的架构设计、关键特性、与应用场景

    内容大纲: RocketMQ的简介与演进 RocketMQ的架构设计 RocketMQ的关键特性 RocketMQ的应用场景 RocketMQ的简介 RocketMQ一个纯java.分布式.队列模型的 ...

最新文章

  1. JZOJ 5268. 旅行
  2. STL set容器的一点总结
  3. H5网页App和纯原生的App差距在哪?
  4. nodeJs 是什么?你需要先想清楚这个问题,才能学习nodejs (介绍)
  5. ubuntu下安装phpredis的模块扩展
  6. easyui表单提交,后台获取不到值
  7. [bbk2907]第3集 - Chapter 02 - RAC的安装过程中需要注意的要点
  8. SpringBoot中AOP实现落地——Filter(过滤器)、Intercepter(拦截器)、Aspect(Spring AOP)
  9. Python 里 and、or 的计算规则
  10. 第19集 轮廓的提取
  11. 如何点击单选框 radio 后面的文字,选中单选框
  12. 音乐类APP竞品分析报告 酷狗音乐 QQ音乐酷我音乐网易云音乐
  13. 如何实现电子签章效果
  14. 【社招】 中/高级C++ Developer - 美国顶尖交易公司Akuna Capital–上海
  15. 无痕HOOK方式=硬断+VEH
  16. 七牛云视频转码 php,学习猿地-我的扩展包分享 - 七牛云视频转码
  17. 方舟生存进化服务器存档位置,方舟生存进化如何转移存档
  18. C#爬虫爬取京东自营笔记本
  19. Dcloud学习资料汇总+视频教程
  20. 苏宁易购账户莫名消失 个人信息泄露并非个例

热门文章

  1. python实现微信小程序反编译
  2. 【配置】JDK8的安装与配置
  3. 学习EasyExcel实现excel文件的批量上传的笔记
  4. React Hook 监听路由切换实时同步渲染页面功能
  5. 编码器—解码器和注意力机制
  6. 赝势平面波计算机软件,第3章 赝势平面波方法(i).doc
  7. Android 虚拟机与类加载机制
  8. 【日常训练】Help Far Away Kingdom(Codeforces 99A)
  9. 树莓派搭建百度云同步
  10. 软件测试|最全的Python for循环和while循环使用介绍