ActiveMQ prefetch属性
官方文档参考:what is the prefetch limit for?
ActiveMQ的设计目标之一是成为高性能的消息总线。这意味着使用SEDA(Staged Event Driven Architecture)架构异步处理尽可能多的任务。为了高效使用网络资源,broker使用push模型把消息分发给消费者。这样可以确保消费者的本地消息缓冲区中,总有待处理的消息。替代的方案是消费者显式的从broker中pull消息。单独pull消息并不是很高效,并且会显著增加每条消息的时延。
然而,如果不限制每次push给消费者的消息个数将会是危险的,过多的消息会导致消费者客户端资源耗尽。尤其是当消息的消费速度显著慢于消息的分发速度时。为了避免这种情况,ActiveMQ使用预取极限(prefetch limit)来限制一次性分发给单个消费者的最大消息个数。消费者则使用预取极限(prefetch limit)来设置其消息缓冲区的大小。
当broker分发给一个消费者prefetch limit个消息之后,在消费者ACK了至少50%的消息(具体来说,是prefetch limit/2个消费者接收到的消息)之前不会再次分发消息至该消费者。当broker接收到这些消息的ACK之后,会再次分发prefetch limit/2个消息到消费者的消息缓冲区。
消息量很大时,为了获得高性能,推荐使用大预取值(prefetch value)。消息量较小,且每个消息需要处理很长时间时,预取值(prefetch value)应该设置为1。这样可以确保一个消费者一次仅处理一个消息。指定预取极限(prefetch limit)为0,broker将不会向消费者push消息,取而代之的是,消费者从broker poll消息,每次拉取一条。
需要注意的是,有些消费者不能缓存消息,针对这类消费者,预取值(prefetch value)必须设置为1。例如一些通过Ruby这类脚本语言实现的消费者,通过STOMP协议连接,就没有客户端消息缓冲区的概念。
可以在ActiveMQConnectionFactory或ActiveMQConnection上指定ActiveMQPrefetchPolicy的实例。这样就可以设置所有单独的预取值(prefetch value)-每个不同类型的服务有不同的值。例如:
persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)
用来和broker建立连接的URI上也可以配置预取极限(prefetch limit)。修改所有消费者类型的预取极限(prefetch limit),可以像下面这样设置连接URI
tcp://localhost:61616?jms.prefetchPolicy.all=50
仅想设置队列类型消费者的预取极限(prefetch limit),可以像下面这样设置连接URI
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
通过设置ActiveMQConnectionFactory的ActiveMQPrefetchPolicy与在URI中直接配置效果是等价的
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(10);
factory.setPrefetchPolicy(prefetchPolicy);
可以使用目的地选项(Destination Options)修改单个消费者的预取极限(prefetch limit)
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=5");
consumer = session.createConsumer(queue);
队列堆积10条消息,prefetch设置为10,执行结果如下
prefetchSize设置为5,执行结果如下
参考ActiveMQ的源代码:
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {......public Message receive() throws JMSException {this.checkClosed();this.checkMessageListener();this.sendPullCommand(0L);MessageDispatch md = this.dequeue(-1L);if(md == null) {return null;} else {this.beforeMessageIsConsumed(md);this.afterMessageIsConsumed(md, false);return this.createActiveMQMessage(md);}}protected void sendPullCommand(long timeout) throws JMSException {this.clearDispatchList();// 同步获取数据,prefetch的值设置为0,consumer使用pull方式从broker获取数据// prefetch的值大于0,broker主动push数据至consumerif(this.info.getPrefetchSize() == 0 && this.unconsumedMessages.isEmpty()) {MessagePull messagePull = new MessagePull();messagePull.configure(this.info);messagePull.setTimeout(timeout);this.session.asyncSendPacket(messagePull);}}public void setMessageListener(MessageListener listener) throws JMSException {this.checkClosed();// 异步获取数据,consumer不会主动从broker pull数据// prefetch值必须大于0以便broker主动push数据至consumerif(this.info.getPrefetchSize() == 0) {throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");} else {if(listener != null) {boolean wasRunning = this.session.isRunning();if(wasRunning) {this.session.stop();}this.messageListener.set(listener);this.session.redispatch(this, this.unconsumedMessages);if(wasRunning) {this.session.start();}} else {this.messageListener.set((Object)null);}}}......
}
this.unconsumedMessages.dequeue(timeout)最终会调用this.unconsumedMessages.dequeue(timeout),unconsumedMessages有两种实现类,SimplePriorityMessageDispatchChannel和FifoMessageDispatchChannel,这两个实现类的dequeue方法都仅返回一个消息,这个很重要,预取多个,但是处理一个
使用消费者池消费消息时,预取将会是一个问题。只有当消费者关闭时,未消费的预取消息才会被释放,但是为了池中的消费者可以被重用,其关闭会被延迟至消费者池关闭时执行。这将导致预取的消息直到消费者被重用时才会被消费(预取多个,处理一个)。这个特性从性能方面来说可以被接受。然而,当池中有超过1个的消费者存在时,这将导致消息投递顺序错乱。出于这个原因,org.apache.activemq.jms.pool.PooledConnectionFactory并不池化消费者(pool consumers)
Spring的CachingConnectionFactory支持池化消费者(pooling consumers)(尽管默认是关闭的)。假如你在Spring的DefaultMessageListenerContainer(DMLC)中配置了一个包含多个消费者线程的CachingConnectionFactory,那么你要么关闭CachingConnectionFactory中的消费者池(默认是关闭的),要么虽然使用消费者池,但是将预取值(prefetch value)设置为0。这样每次调用receive(timeout)方法,消费者将使用poll的方式获取消息。通常建议关闭Spring的 CachingConnectionFactory以及任意允许池化JMS消费者框架中的消费者缓存。
让DMLC在其生命周期内自己管理缓存,参考DMLC源码注释:
Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
in combination with dynamic scaling. Ideally, don't use it with a message
listener container at all, since it is generally preferable to let the
listener container itself handle appropriate caching within its lifecycle.
需要注意的是,Spring的DMLC以及其CACHE_CONSUMER缓存等级不受这个问题的影响!Spring的DMLC在某种意义上并不池化消费者(pool consumers),即其内部并不使用由多个消费者实例组成的消费者池。相反,它会缓存消费者,也就是说,在DMLC实例的整个生命周期中,都会重用同一个JMS 消费者对象来接收所有消息。其表现很像单纯的手写JMS代码-创建JMS连接,会话,消费者,然后用这个消费者实例接收所有消息。
因此在Spring的DMLC中使用CACHE_CONSUMER是没问题的,即使是使用多个消费者线程,除非你在使用XA事务。CACHE_CONSUMER对XA事务并不生效。然而本地JMS事务和非事务性消费者在Spring的DMLC中使用CACHE_CONSUMER是没问题的。
Spring的配置文件示例如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"default-autowire="byName"><bean id="testPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"><property name="queuePrefetch" value="50"></property></bean><bean id="testConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="${test.server}" /><property name="userName" value="${test.user}" /><property name="password" value="${test.password}" /><property name="useAsyncSend" value="true" /><property name="sendTimeout" value="5000" /><property name="prefetchPolicy" ref="testPrefetchPolicy" /></bean><bean id="testCachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="testConnectionFactory" /><property name="sessionCacheSize" value="150" /><property name="cacheConsumers" value="false" /></bean><bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="${test.queuename}" /></bean><bean id="testMessageListener" class="com.sean.TestConsumer" /><bean id="testQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="testCachedConnectionFactory" /><property name="destination" ref="testQueue" /><property name="messageListener" ref="testMessageListener" /><property name="cacheLevelName" value="CACHE_CONSUMER" /><property name="concurrentConsumers" value="10" /><property name="maxConcurrentConsumers" value="40" /><property name="receiveTimeout" value="5000" /><property name="sessionTransacted" value="false" /><property name="idleConsumerLimit" value="1" /></bean>
</beans>
Spring的DMLC继承关系如下:
方法的调用过程如下:
AbstractJmsListeningContainer.afterPropertiesSet
DefaultMessageListenerContainer.initialize
AbstractPollingMessageListenerContainer.initialize
AbstractJmsListeningContainer.initialize
DefaultMessageListenerContainer.doInitialize
DefaultMessageListenerContainer.scheduleNewInvoker
AbstractJmsListeningContainer.rescheduleTaskIfNecessary
DefaultMessageListenerContainer.doRescheduleTask
DefaultMessageListenerContainer.AsyncMessageListenerInvoker.run
下面上几段重点代码:
// Adapt default cache level.
if (this.cacheLevel == CACHE_AUTO) {this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
}
根据concurrentConsumers个数创建AsyncMessageListenerInvoker
protected void doInitialize() throws JMSException {synchronized (this.lifecycleMonitor) {for (int i = 0; i < this.concurrentConsumers; i++) {scheduleNewInvoker();}}
}private void scheduleNewInvoker() {AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();if (rescheduleTaskIfNecessary(invoker)) {// This should always be true, since we're only calling this when active.this.scheduledInvokers.add(invoker);}
}
每个AsyncMessageListenerInvoker中仅保存了一个私有的MessageConsumer
在接收到消息进行处理的过程中,如果cacheLevel为CACHE_CONSUMER,并且AsyncMessageListenerInvoker中已经创建过consumer,之后将一直使用该consumer处理消息,并不会重复创建consumer
private boolean invokeListener() throws JMSException {initResourcesIfNecessary();boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);this.lastMessageSucceeded = true;return messageReceived;
}private void initResourcesIfNecessary() throws JMSException {if (getCacheLevel() <= CACHE_CONNECTION) {updateRecoveryMarker();}else {if (this.session == null && getCacheLevel() >= CACHE_SESSION) {updateRecoveryMarker();this.session = createSession(getSharedConnection());}if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {this.consumer = createListenerConsumer(this.session);synchronized (lifecycleMonitor) {registeredWithDestination++;}}}
}
我们接下来继续看调用过程
DefaultMessageListenerContainer.AsyncMessageListenerInvoker.invokeListener
AbstractPollingMessageListenerContainer.receiveAndExecute
AbstractPollingMessageListenerContainer.doReceiveAndExecute
AbstractPollingMessageListenerContainer.receiveMessage
AbstractMessageListenerContainer.doExecuteListener
AbstractMessageListenerContainer.invokeListener
AbstractMessageListenerContainer.doInvokeListener
MessageListener.onMessage
调用MessageConsumer的receive方法主动获取代码,和之前ActiveMQMessageConsumer类的源码连接起来了
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));
}
还有需要注意的是Camel JMS或者ActiveMQ组件在其内部使用Spring的DMLC。因此上述所有关于Spring的DMLC以及CACHE_CONSUMER的内容对于这两个Camel组件同样适用
设置一个相对较大的预取值(prefetch value)将会导致更高的性能。因此默认值通常大于1000。预取值(prefetch value)影响客户端RAM中保存的消息个数。因此当RAM的大小是限制因素时,设置一个小预取值(prefetch value)例如1或者10会是更好的选择。
ActiveMQ prefetch属性相关推荐
- 预加载属性 preload 与 prefetch 区别
原文链接:https://waynegong.cn/posts/40528.html TLDR: preload 告诉浏览器立即加载资源; prefetch 告诉浏览器在空闲时才开始加载资源: pre ...
- QQ企业邮箱+Spring+Javamail+ActiveMQ(发送企业邮件)
原来有个教程是关于Javamail的,但是那个是自己写Javamail的发送过程,这次不同的是:使用Spring的Mail功能,使用了消息队列. 先看一下设想的场景 不过本文重点不是消息队列,而是使用 ...
- MediaPlayer属性大全
播放: MediaPlayer.Play() 暂停: MediaPlayer.Pause() 定位: MediaPlayer.SetCurrentEntry(lWhichEntry) Me ...
- prefetch 和preload_Preload和Prefetch以及前端项目中的配置
1.vuecli3.x or 4.x默认打包之后,部署到服务器上的项目,会对静态资源的标签上默认加载preload或者prefetch属性, 啥是preload和prefetch呢? Preload的 ...
- RabbitMQ入门到精通
RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...
- RabbitMQ学习笔记
目录 一.MQ 的相关概念 MQ是什么? MQ三大优势 MQ的劣势 MQ 的产品 RabbitMQ核心 JMS 安装 二.HelloWorld 三.Work Queues(轮训) 消息应答 Rabbi ...
- 6-RabbitMQ实战
文章目录 1. 消息中间件概述 1.1 MQ简介 1.1.1 什么是消息中间件 1.1.2为什么使用MQ 1.1.3消息队列(MQ)应用场景 1.1.4 MQ的劣势 1.1.5常见的 MQ 产品 1. ...
- 【谷粒商城】【rabbitMQ】笔记
文章目录 1. 消息中间件概述 1.1. 什么是消息中间件 1.2. AMQP 和 JMS 1.2.1. AMQP 1.2.2. JMS 1.2.3. AMQP 与 JMS 区别 1.3. 消息队列产 ...
- RabbitMQ-第二天
1. RabbitMQ 高级特性 1.1 消息可靠性投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递 ...
最新文章
- 构建一个给爬虫使用的代理IP池
- 使用Apple的感受
- ugui unity 取消选择_UGUI中几种不规则按钮的实现方式
- 本周DOT将解锁302.47万枚,上周共质押919.9万枚
- mysql 字符串拼接的几种方式
- swfobject java_SWFObject 2.1以上版本语法介绍
- 1分钟了解微信收款商业版
- mac mysql 移动硬盘_MAC上安装Fuse for macOS以支持读取NTFS格式的移动硬盘
- 计算机主机时间不保存,电脑主板系统时间不能保存
- 【Windows编程】Windows Socket API介绍
- java学习总结(16.06.28)包装类和包装类的常用方法
- 产业区块链生态架构图
- 阿里研究员吴翰清:世界需要什么样的智能系统
- Excel读写合集:Excel读写小白从不知所措到轻松上手
- mysql 8.0 初识
- 如何区分集线器、交换机、路由器呢
- 阿里云服务器支持IPV6和CND的详细教程
- 首席新媒体黎想教程:活动运营主题策划,及前端玩法设计!
- 2021宜宾叙州区二中高考成绩查询,宜宾叙州第二中学2021年排名
- word中不能设置首字下沉的一个原因