上一节我们已经把DefaultMQPushConsumer的大体流程分析了一下,从这节开始我们分析一部分消息处理的细节问题。
继续在DefaultMQPushConsumerImpl的pullMessage方法中有个ProcessQueue,待会我们来分析这个队列的作用。

 public void pullMessage(final PullRequest pullRequest) {final ProcessQueue processQueue = pullRequest.getProcessQueue();...}

在多线程处理消息过程中,是通过线程池在线程池中各个消息处理逻辑同时进行,代码如下:

public void pullMessage(final PullRequest pullRequest) {...DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);...}public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) {this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;this.messageListener = messageListener;this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_"));this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));}

pull获得消息如果直接提交到线程池中很难监控和控制,例如消息堆积数量、重复处理某些消息,延迟处理消息。rocketmq是通过一个快照类ProcessQueue,在pushConsumer运行的时候,每个Message Queue都会对应一个ProcessQueue对象,保存该Message Queue消息处理状态的快照。

ProcessQueue是通过TreeMap和读写锁实现。TreeMap里以Message Queue的offset作为key,以消息内容引用为value,保存了所有从MessageQueue获取到但是还没有被处理的消息。

下面我来看看代码中逻辑控制:

public void pullMessage(final PullRequest pullRequest) {...// 如果processQueue消息数量>队列级别的流量控制阈值,默认情况下,每个消息队列最多缓存1000条消息if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}// 如果队列缓存消息的大小超过100M(默认),考虑到批量消息瞬时可能超过100Mif (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (!this.consumeOrderly) {// 判断是否大于最大消息便宜跨度(默认2000)if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}}...}

通过上面源码可以看出PushConsumer会判断获取但未处理的消息个数,消息总大小、offset跨度,任何一个超过限制就隔一段时间再拉取消息,从而达到控制流量的目的。

RocketMQ学习笔记三之【DefaultMQPushConsumer流量控制】相关推荐

  1. RocketMQ学习笔记

    RocketMQ学习笔记 文章目录 RocketMQ学习笔记 前言 为什么要使用消息队列? 解耦 异步 削峰 使用了消息队列会有什么缺点? 消息队列如何选型? 如何保证消息队列是高可用的? 如何保证消 ...

  2. JavaEE 企业级分布式高级架构师(二十)RocketMQ学习笔记(2)

    RocketMQ学习笔记 进阶篇 消息样例 普通消息 消息发送 发送同步消息 发送异步消息 单向发送消息 三种发送方式的对比 消费消息 顺序消息 如何保证顺序 顺序的实现 MessageListene ...

  3. RocketMQ学习笔记(8)----RocketMQ的Producer API简介

    在RocketMQ中提供了三种发送消息的模式: 1.NormalProducer(普通) 2.OrderProducer(顺序) 3.TransactionProducer(事务) 下面来介绍一下pr ...

  4. J2EE学习笔记三:EJB基础概念和知识 收藏

    J2EE学习笔记三:EJB基础概念和知识 收藏 EJB正是J2EE的旗舰技术,因此俺直接跳到这一章来了,前面的几章都是讲Servlet和JSP以及JDBC的,俺都懂一些.那么EJB和通常我们所说的Ja ...

  5. tensorflow学习笔记(三十二):conv2d_transpose (解卷积)

    tensorflow学习笔记(三十二):conv2d_transpose ("解卷积") deconv解卷积,实际是叫做conv_transpose, conv_transpose ...

  6. Ethernet/IP 学习笔记三

    Ethernet/IP 学习笔记三 原文为硕士论文: 工业以太网Ethernet/IP扫描器的研发 知网网址: http://kns.cnki.net/KCMS/detail/detail.aspx? ...

  7. RocketMQ学习笔记(7)----RocketMQ的整体架构

    1. RocketMQ主要的9个模块,如图: 2. 模块介绍 1. rocketmq-common:通用的常量枚举,基类方法或者数据结构,按描述的目标来分包,通俗易懂.报名有admin,consume ...

  8. iView学习笔记(三):表格搜索,过滤及隐藏列操作

    iView学习笔记(三):表格搜索,过滤及隐藏某列操作 1.后端准备工作 环境说明 python版本:3.6.6 Django版本:1.11.8 数据库:MariaDB 5.5.60 新建Django ...

  9. 吴恩达《机器学习》学习笔记三——多变量线性回归

    吴恩达<机器学习>学习笔记三--多变量线性回归 一. 多元线性回归问题介绍 1.一些定义 2.假设函数 二. 多元梯度下降法 1. 梯度下降法实用技巧:特征缩放 2. 梯度下降法的学习率 ...

最新文章

  1. [CVPR 2020] RandLA-Net:大场景三维点云语义分割新框架(已开源)
  2. 蚂蚁金服对研发高要求的领域建模能力是指什么?
  3. EXC中时间控件的使用
  4. go 17个字符串函数使用示例
  5. VS中修改站点运行方式(集成 Or 经典)
  6. 最简单的delphi启动画面(转)
  7. 计算机代码如何求三角形面积,简单的程序来计算三角形的面积
  8. ajax异步session值不唯一 总是改变 解决办法
  9. MongoDB应用之自增id
  10. Windows下mysql5.7修改root密码
  11. 使用OneR算法进行分类(Python实现)
  12. 满意度调查中的NPS题目怎么设置?
  13. oracle v diag info,V$DIAG_INFO及V$DIAG_CRITICAL_ERROR视图
  14. 用html制作双色球代码,Html5 canvas 绘制彩票走势图
  15. 20221208AD域控服务器问题解决记录--lsass.exe上传流量异常
  16. Excel 神操作,利用公式对比两个不同的表,替换内容(保姆级教程)
  17. 车载多媒体Android开发平台学习心得
  18. Linux文件 IO 和标准 IO简介
  19. 解决问题:Class JavaLaunchHelper is implemented in both
  20. OPPO A73线刷包下载_OPPO A73密码忘记了?来这里搞定!

热门文章

  1. 解决warning MSB8012:问题
  2. ISM解释结构模型——研究系统结构关系情况
  3. js中ondblclick
  4. onclick传两参数,需要转义双引号,不然只能传一个数值
  5. 好用的在线客服系统有哪些
  6. 使用nisebosh方式部署cloudfoundry多节点二
  7. 解压压缩软件推荐 —— Bandizip
  8. 【OSATE学习笔记】Model Analyses 模型分析方法
  9. Stable-Diffusion|文生图 拍立得纪实风格的Lora 图例(三)
  10. 通过Python实现目标点经纬度的自动查询