官方文档参考:what is the prefetch limit for?

ActiveMQ的设计目标之一是成为高性能的消息总线。这意味着使用SEDA(Staged Event Driven Architecture)架构异步处理尽可能多的任务。为了高效使用网络资源,broker使用push模型把消息分发给消费者。这样可以确保消费者的本地消息缓冲区中,总有待处理的消息。替代的方案是消费者显式的从broker中pull消息。单独pull消息并不是很高效,并且会显著增加每条消息的时延。

然而,如果不限制每次push给消费者的消息个数将会是危险的,过多的消息会导致消费者客户端资源耗尽。尤其是当消息的消费速度显著慢于消息的分发速度时。为了避免这种情况,ActiveMQ使用预取极限(prefetch limit)来限制一次性分发给单个消费者的最大消息个数。消费者则使用预取极限(prefetch limit)来设置其消息缓冲区的大小。

当broker分发给一个消费者prefetch limit个消息之后,在消费者ACK了至少50%的消息(具体来说,是prefetch limit/2个消费者接收到的消息)之前不会再次分发消息至该消费者。当broker接收到这些消息的ACK之后,会再次分发prefetch limit/2个消息到消费者的消息缓冲区。

消息量很大时,为了获得高性能,推荐使用大预取值(prefetch value)。消息量较小,且每个消息需要处理很长时间时,预取值(prefetch value)应该设置为1。这样可以确保一个消费者一次仅处理一个消息。指定预取极限(prefetch limit)为0,broker将不会向消费者push消息,取而代之的是,消费者从broker poll消息,每次拉取一条。

需要注意的是,有些消费者不能缓存消息,针对这类消费者,预取值(prefetch value)必须设置为1。例如一些通过Ruby这类脚本语言实现的消费者,通过STOMP协议连接,就没有客户端消息缓冲区的概念。

可以在ActiveMQConnectionFactory或ActiveMQConnection上指定ActiveMQPrefetchPolicy的实例。这样就可以设置所有单独的预取值(prefetch value)-每个不同类型的服务有不同的值。例如:

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

用来和broker建立连接的URI上也可以配置预取极限(prefetch limit)。修改所有消费者类型的预取极限(prefetch limit),可以像下面这样设置连接URI

tcp://localhost:61616?jms.prefetchPolicy.all=50

仅想设置队列类型消费者的预取极限(prefetch limit),可以像下面这样设置连接URI

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

通过设置ActiveMQConnectionFactory的ActiveMQPrefetchPolicy与在URI中直接配置效果是等价的

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(10);
factory.setPrefetchPolicy(prefetchPolicy);

可以使用目的地选项(Destination Options)修改单个消费者的预取极限(prefetch limit)

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=5");
consumer = session.createConsumer(queue);

队列堆积10条消息,prefetch设置为10,执行结果如下

prefetchSize设置为5,执行结果如下

参考ActiveMQ的源代码:

public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {......public Message receive() throws JMSException {this.checkClosed();this.checkMessageListener();this.sendPullCommand(0L);MessageDispatch md = this.dequeue(-1L);if(md == null) {return null;} else {this.beforeMessageIsConsumed(md);this.afterMessageIsConsumed(md, false);return this.createActiveMQMessage(md);}}protected void sendPullCommand(long timeout) throws JMSException {this.clearDispatchList();// 同步获取数据,prefetch的值设置为0,consumer使用pull方式从broker获取数据// prefetch的值大于0,broker主动push数据至consumerif(this.info.getPrefetchSize() == 0 && this.unconsumedMessages.isEmpty()) {MessagePull messagePull = new MessagePull();messagePull.configure(this.info);messagePull.setTimeout(timeout);this.session.asyncSendPacket(messagePull);}}public void setMessageListener(MessageListener listener) throws JMSException {this.checkClosed();// 异步获取数据,consumer不会主动从broker pull数据// prefetch值必须大于0以便broker主动push数据至consumerif(this.info.getPrefetchSize() == 0) {throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");} else {if(listener != null) {boolean wasRunning = this.session.isRunning();if(wasRunning) {this.session.stop();}this.messageListener.set(listener);this.session.redispatch(this, this.unconsumedMessages);if(wasRunning) {this.session.start();}} else {this.messageListener.set((Object)null);}}}......
}

this.unconsumedMessages.dequeue(timeout)最终会调用this.unconsumedMessages.dequeue(timeout),unconsumedMessages有两种实现类,SimplePriorityMessageDispatchChannel和FifoMessageDispatchChannel,这两个实现类的dequeue方法都仅返回一个消息,这个很重要,预取多个,但是处理一个

使用消费者池消费消息时,预取将会是一个问题。只有当消费者关闭时,未消费的预取消息才会被释放,但是为了池中的消费者可以被重用,其关闭会被延迟至消费者池关闭时执行。这将导致预取的消息直到消费者被重用时才会被消费(预取多个,处理一个)。这个特性从性能方面来说可以被接受。然而,当池中有超过1个的消费者存在时,这将导致消息投递顺序错乱。出于这个原因,org.apache.activemq.jms.pool.PooledConnectionFactory并不池化消费者(pool consumers)

Spring的CachingConnectionFactory支持池化消费者(pooling consumers)(尽管默认是关闭的)。假如你在Spring的DefaultMessageListenerContainer(DMLC)中配置了一个包含多个消费者线程的CachingConnectionFactory,那么你要么关闭CachingConnectionFactory中的消费者池(默认是关闭的),要么虽然使用消费者池,但是将预取值(prefetch value)设置为0。这样每次调用receive(timeout)方法,消费者将使用poll的方式获取消息。通常建议关闭Spring的 CachingConnectionFactory以及任意允许池化JMS消费者框架中的消费者缓存。

让DMLC在其生命周期内自己管理缓存,参考DMLC源码注释:

Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
in combination with dynamic scaling. Ideally, don't use it with a message
listener container at all, since it is generally preferable to let the
listener container itself handle appropriate caching within its lifecycle.

需要注意的是,Spring的DMLC以及其CACHE_CONSUMER缓存等级不受这个问题的影响!Spring的DMLC在某种意义上并不池化消费者(pool consumers),即其内部并不使用由多个消费者实例组成的消费者池。相反,它会缓存消费者,也就是说,在DMLC实例的整个生命周期中,都会重用同一个JMS 消费者对象来接收所有消息。其表现很像单纯的手写JMS代码-创建JMS连接,会话,消费者,然后用这个消费者实例接收所有消息。

因此在Spring的DMLC中使用CACHE_CONSUMER是没问题的,即使是使用多个消费者线程,除非你在使用XA事务。CACHE_CONSUMER对XA事务并不生效。然而本地JMS事务和非事务性消费者在Spring的DMLC中使用CACHE_CONSUMER是没问题的。

Spring的配置文件示例如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"default-autowire="byName"><bean id="testPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"><property name="queuePrefetch" value="50"></property></bean><bean id="testConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="${test.server}" /><property name="userName" value="${test.user}" /><property name="password" value="${test.password}" /><property name="useAsyncSend" value="true" /><property name="sendTimeout" value="5000" /><property name="prefetchPolicy" ref="testPrefetchPolicy" /></bean><bean id="testCachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="testConnectionFactory" /><property name="sessionCacheSize" value="150" /><property name="cacheConsumers" value="false" /></bean><bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="${test.queuename}" /></bean><bean id="testMessageListener" class="com.sean.TestConsumer" /><bean id="testQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="testCachedConnectionFactory" /><property name="destination" ref="testQueue" /><property name="messageListener" ref="testMessageListener" /><property name="cacheLevelName" value="CACHE_CONSUMER" /><property name="concurrentConsumers" value="10" /><property name="maxConcurrentConsumers" value="40" /><property name="receiveTimeout" value="5000" /><property name="sessionTransacted" value="false" /><property name="idleConsumerLimit" value="1" /></bean>
</beans>

Spring的DMLC继承关系如下:

方法的调用过程如下:

AbstractJmsListeningContainer.afterPropertiesSet
DefaultMessageListenerContainer.initialize
AbstractPollingMessageListenerContainer.initialize
AbstractJmsListeningContainer.initialize
DefaultMessageListenerContainer.doInitialize
DefaultMessageListenerContainer.scheduleNewInvoker
AbstractJmsListeningContainer.rescheduleTaskIfNecessary
DefaultMessageListenerContainer.doRescheduleTask
DefaultMessageListenerContainer.AsyncMessageListenerInvoker.run

下面上几段重点代码:

// Adapt default cache level.
if (this.cacheLevel == CACHE_AUTO) {this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
}

根据concurrentConsumers个数创建AsyncMessageListenerInvoker

protected void doInitialize() throws JMSException {synchronized (this.lifecycleMonitor) {for (int i = 0; i < this.concurrentConsumers; i++) {scheduleNewInvoker();}}
}private void scheduleNewInvoker() {AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();if (rescheduleTaskIfNecessary(invoker)) {// This should always be true, since we're only calling this when active.this.scheduledInvokers.add(invoker);}
}

每个AsyncMessageListenerInvoker中仅保存了一个私有的MessageConsumer

在接收到消息进行处理的过程中,如果cacheLevel为CACHE_CONSUMER,并且AsyncMessageListenerInvoker中已经创建过consumer,之后将一直使用该consumer处理消息,并不会重复创建consumer

private boolean invokeListener() throws JMSException {initResourcesIfNecessary();boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);this.lastMessageSucceeded = true;return messageReceived;
}private void initResourcesIfNecessary() throws JMSException {if (getCacheLevel() <= CACHE_CONNECTION) {updateRecoveryMarker();}else {if (this.session == null && getCacheLevel() >= CACHE_SESSION) {updateRecoveryMarker();this.session = createSession(getSharedConnection());}if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {this.consumer = createListenerConsumer(this.session);synchronized (lifecycleMonitor) {registeredWithDestination++;}}}
}

我们接下来继续看调用过程

DefaultMessageListenerContainer.AsyncMessageListenerInvoker.invokeListener
AbstractPollingMessageListenerContainer.receiveAndExecute
AbstractPollingMessageListenerContainer.doReceiveAndExecute
AbstractPollingMessageListenerContainer.receiveMessage
AbstractMessageListenerContainer.doExecuteListener
AbstractMessageListenerContainer.invokeListener
AbstractMessageListenerContainer.doInvokeListener
MessageListener.onMessage

调用MessageConsumer的receive方法主动获取代码,和之前ActiveMQMessageConsumer类的源码连接起来了

protected Message receiveMessage(MessageConsumer consumer) throws JMSException {return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
}

还有需要注意的是Camel JMS或者ActiveMQ组件在其内部使用Spring的DMLC。因此上述所有关于Spring的DMLC以及CACHE_CONSUMER的内容对于这两个Camel组件同样适用

设置一个相对较大的预取值(prefetch value)将会导致更高的性能。因此默认值通常大于1000。预取值(prefetch value)影响客户端RAM中保存的消息个数。因此当RAM的大小是限制因素时,设置一个小预取值(prefetch value)例如1或者10会是更好的选择。

ActiveMQ prefetch属性相关推荐

  1. 预加载属性 preload 与 prefetch 区别

    原文链接:https://waynegong.cn/posts/40528.html TLDR: preload 告诉浏览器立即加载资源; prefetch 告诉浏览器在空闲时才开始加载资源: pre ...

  2. QQ企业邮箱+Spring+Javamail+ActiveMQ(发送企业邮件)

    原来有个教程是关于Javamail的,但是那个是自己写Javamail的发送过程,这次不同的是:使用Spring的Mail功能,使用了消息队列. 先看一下设想的场景 不过本文重点不是消息队列,而是使用 ...

  3. MediaPlayer属性大全

    播放: MediaPlayer.Play() 暂停: MediaPlayer.Pause() 定位: MediaPlayer.SetCurrentEntry(lWhichEntry)       Me ...

  4. prefetch 和preload_Preload和Prefetch以及前端项目中的配置

    1.vuecli3.x or 4.x默认打包之后,部署到服务器上的项目,会对静态资源的标签上默认加载preload或者prefetch属性, 啥是preload和prefetch呢? Preload的 ...

  5. RabbitMQ入门到精通

    RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...

  6. RabbitMQ学习笔记

    目录 一.MQ 的相关概念 MQ是什么? MQ三大优势 MQ的劣势 MQ 的产品 RabbitMQ核心 JMS 安装 二.HelloWorld 三.Work Queues(轮训) 消息应答 Rabbi ...

  7. 6-RabbitMQ实战

    文章目录 1. 消息中间件概述 1.1 MQ简介 1.1.1 什么是消息中间件 1.1.2为什么使用MQ 1.1.3消息队列(MQ)应用场景 1.1.4 MQ的劣势 1.1.5常见的 MQ 产品 1. ...

  8. 【谷粒商城】【rabbitMQ】笔记

    文章目录 1. 消息中间件概述 1.1. 什么是消息中间件 1.2. AMQP 和 JMS 1.2.1. AMQP 1.2.2. JMS 1.2.3. AMQP 与 JMS 区别 1.3. 消息队列产 ...

  9. RabbitMQ-第二天

    1. RabbitMQ 高级特性 1.1 消息可靠性投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递 ...

最新文章

  1. 构建一个给爬虫使用的代理IP池
  2. 使用Apple的感受
  3. ugui unity 取消选择_UGUI中几种不规则按钮的实现方式
  4. 本周DOT将解锁302.47万枚,上周共质押919.9万枚
  5. mysql 字符串拼接的几种方式
  6. swfobject java_SWFObject 2.1以上版本语法介绍
  7. 1分钟了解微信收款商业版
  8. mac mysql 移动硬盘_MAC上安装Fuse for macOS以支持读取NTFS格式的移动硬盘
  9. 计算机主机时间不保存,电脑主板系统时间不能保存
  10. 【Windows编程】Windows Socket API介绍
  11. java学习总结(16.06.28)包装类和包装类的常用方法
  12. 产业区块链生态架构图
  13. 阿里研究员吴翰清:世界需要什么样的智能系统
  14. Excel读写合集:Excel读写小白从不知所措到轻松上手
  15. mysql 8.0 初识
  16. 如何区分集线器、交换机、路由器呢
  17. 阿里云服务器支持IPV6和CND的详细教程
  18. 首席新媒体黎想教程:活动运营主题策划,及前端玩法设计!
  19. 2021宜宾叙州区二中高考成绩查询,宜宾叙州第二中学2021年排名
  20. word中不能设置首字下沉的一个原因

热门文章

  1. stable-diffusion领域prompt集合
  2. BlueTooth: 高品质蓝牙音频的设计考虑
  3. 想在AI前沿技术领域工作?7家公司能让你梦想成真
  4. eventhandler java_Velocity之EventHandler
  5. 【CSS】虚线的两种实现方式
  6. eclipse怎么导入压缩包_eclipse压缩包安装教程
  7. 港联证券|“面值退”增多凸显A股市场化进程良性态势
  8. UML--部署图详解
  9. 世界杯掠影系列(一)
  10. 深度学习下的医学图像分析