RocketMQ学习笔记三之【DefaultMQPushConsumer流量控制】
上一节我们已经把DefaultMQPushConsumer的大体流程分析了一下,从这节开始我们分析一部分消息处理的细节问题。
继续在DefaultMQPushConsumerImpl的pullMessage方法中有个ProcessQueue,待会我们来分析这个队列的作用。
public void pullMessage(final PullRequest pullRequest) {final ProcessQueue processQueue = pullRequest.getProcessQueue();...}
在多线程处理消息过程中,是通过线程池在线程池中各个消息处理逻辑同时进行,代码如下:
public void pullMessage(final PullRequest pullRequest) {...DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);...}public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) {this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;this.messageListener = messageListener;this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_"));this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));}
pull获得消息如果直接提交到线程池中很难监控和控制,例如消息堆积数量、重复处理某些消息,延迟处理消息。rocketmq是通过一个快照类ProcessQueue,在pushConsumer运行的时候,每个Message Queue都会对应一个ProcessQueue对象,保存该Message Queue消息处理状态的快照。
ProcessQueue是通过TreeMap和读写锁实现。TreeMap里以Message Queue的offset作为key,以消息内容引用为value,保存了所有从MessageQueue获取到但是还没有被处理的消息。
下面我来看看代码中逻辑控制:
public void pullMessage(final PullRequest pullRequest) {...// 如果processQueue消息数量>队列级别的流量控制阈值,默认情况下,每个消息队列最多缓存1000条消息if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}// 如果队列缓存消息的大小超过100M(默认),考虑到批量消息瞬时可能超过100Mif (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (!this.consumeOrderly) {// 判断是否大于最大消息便宜跨度(默认2000)if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}}...}
通过上面源码可以看出PushConsumer会判断获取但未处理的消息个数,消息总大小、offset跨度,任何一个超过限制就隔一段时间再拉取消息,从而达到控制流量的目的。
RocketMQ学习笔记三之【DefaultMQPushConsumer流量控制】相关推荐
- RocketMQ学习笔记
RocketMQ学习笔记 文章目录 RocketMQ学习笔记 前言 为什么要使用消息队列? 解耦 异步 削峰 使用了消息队列会有什么缺点? 消息队列如何选型? 如何保证消息队列是高可用的? 如何保证消 ...
- JavaEE 企业级分布式高级架构师(二十)RocketMQ学习笔记(2)
RocketMQ学习笔记 进阶篇 消息样例 普通消息 消息发送 发送同步消息 发送异步消息 单向发送消息 三种发送方式的对比 消费消息 顺序消息 如何保证顺序 顺序的实现 MessageListene ...
- RocketMQ学习笔记(8)----RocketMQ的Producer API简介
在RocketMQ中提供了三种发送消息的模式: 1.NormalProducer(普通) 2.OrderProducer(顺序) 3.TransactionProducer(事务) 下面来介绍一下pr ...
- J2EE学习笔记三:EJB基础概念和知识 收藏
J2EE学习笔记三:EJB基础概念和知识 收藏 EJB正是J2EE的旗舰技术,因此俺直接跳到这一章来了,前面的几章都是讲Servlet和JSP以及JDBC的,俺都懂一些.那么EJB和通常我们所说的Ja ...
- tensorflow学习笔记(三十二):conv2d_transpose (解卷积)
tensorflow学习笔记(三十二):conv2d_transpose ("解卷积") deconv解卷积,实际是叫做conv_transpose, conv_transpose ...
- Ethernet/IP 学习笔记三
Ethernet/IP 学习笔记三 原文为硕士论文: 工业以太网Ethernet/IP扫描器的研发 知网网址: http://kns.cnki.net/KCMS/detail/detail.aspx? ...
- RocketMQ学习笔记(7)----RocketMQ的整体架构
1. RocketMQ主要的9个模块,如图: 2. 模块介绍 1. rocketmq-common:通用的常量枚举,基类方法或者数据结构,按描述的目标来分包,通俗易懂.报名有admin,consume ...
- iView学习笔记(三):表格搜索,过滤及隐藏列操作
iView学习笔记(三):表格搜索,过滤及隐藏某列操作 1.后端准备工作 环境说明 python版本:3.6.6 Django版本:1.11.8 数据库:MariaDB 5.5.60 新建Django ...
- 吴恩达《机器学习》学习笔记三——多变量线性回归
吴恩达<机器学习>学习笔记三--多变量线性回归 一. 多元线性回归问题介绍 1.一些定义 2.假设函数 二. 多元梯度下降法 1. 梯度下降法实用技巧:特征缩放 2. 梯度下降法的学习率 ...
最新文章
- [CVPR 2020] RandLA-Net:大场景三维点云语义分割新框架(已开源)
- 蚂蚁金服对研发高要求的领域建模能力是指什么?
- EXC中时间控件的使用
- go 17个字符串函数使用示例
- VS中修改站点运行方式(集成 Or 经典)
- 最简单的delphi启动画面(转)
- 计算机代码如何求三角形面积,简单的程序来计算三角形的面积
- ajax异步session值不唯一 总是改变 解决办法
- MongoDB应用之自增id
- Windows下mysql5.7修改root密码
- 使用OneR算法进行分类(Python实现)
- 满意度调查中的NPS题目怎么设置?
- oracle v diag info,V$DIAG_INFO及V$DIAG_CRITICAL_ERROR视图
- 用html制作双色球代码,Html5 canvas 绘制彩票走势图
- 20221208AD域控服务器问题解决记录--lsass.exe上传流量异常
- Excel 神操作,利用公式对比两个不同的表,替换内容(保姆级教程)
- 车载多媒体Android开发平台学习心得
- Linux文件 IO 和标准 IO简介
- 解决问题:Class JavaLaunchHelper is implemented in both
- OPPO A73线刷包下载_OPPO A73密码忘记了?来这里搞定!