本文的ActiveMQ都基于5.10版本,参考了ActiveMQ官方文档:http://activemq.apache.org/failover-transport-reference.html。

一、ActiveMQ集群

  集群是个比较广泛的概念,它有多种形式,关于消息服务的集群,大概分为Consumer集群(消费者集群)和Broker集群(消息服务器集群)两种:
1、对于消费者集群,对于队列消费者,主要是:
  1)保证如果某一个消费者死亡了,任何它没有确认完的消息会被重传别的正常的消费者来消费;
  2)如果一个消费者消费消息过快,就可以比别的消费者得到更多的消息;
  3)如果一个消费者消费消息过慢,它就会被少得到消息。
  第1点几乎是所有JMS提供者都有的功能——消息重传机制(可以参考我的其他ActiveMQ博文)。第2点和第3点也是很正常的,因为大多消费者和线程是一一对应的关系,你消费速率快,当然可以自己去服务器拉取更多的消息。当然ActiveMQ在队列上给消费者提供了高性能的负载均衡策略。对于主题订阅者,由于每个订阅者接受到被推送的消息都和其他订阅者无关,所以处理相对简单,JMS也有持久订阅者这一概念,这里不多说。
2、对于消息服务器集群,主要是指:
  1)如果集群中的某一台消息服务器宕机,与该台消息服务器相连接的生产者和消费者能否自动连接到其他正常工作的消息服务器。
  2)如果集群中的某一台消息服务器宕机,该台服务器上未消费的消息能否在该台服务器恢复正常之前由其他服务器转发。
  3)集群环境中会不会导致某台消息服务器上只有消费者或者某台消息服务器上只有生产者。对于1,ActiveMQ提供了一种叫做失效转移(也叫故障转移,FailOver)的策略。失效转移提供了在传输层上重新连接到其他任何传输器的功能。使用它很简单,只需要在uri中配置就行了,语法如下:

failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN

例子:

failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
注:如果这样使用报错你可以试试这个:failover://(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100  (this way works in ActiveMQ 4.1.1 the one above does not)

二、配置属性

1、ActiveMQ客户端
  如果某个ActiveMQ客户端发现uri1地址失效了,它会立即转向uri地址列表中其他可以连接的消息服务器进行重连,以保证继续正常工作,请注意,并不是uri1失效了就会选则uri2重连,这种选择其他地址的方式默认是随机的,以保证负载均衡,如果你想关闭随机,可以transportOptions中加入randomize=false。
  transportOptions有多种参数可以选择,如下:
1)initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之前等待的时间。
2)maxReconnectDelay:默认30000,单位毫秒,表示两次重连之间的最大时间间隔。
3)useExponentialBackOff:默认为true,表示重连时是否加入避让指数来避免高并发。
4)reconnectDelayExponent:默认为2.0,重连时使用的避让指数。
5)maxReconnectAttempts:5.6版本之前默认为-1,5.6版本及其以后,默认为0,0表示重连的次数无限,配置大于0可以指定最大重连次数。
6)startupMaxReconnectAttempts:默认为0,如果该值不为0,表示客户端接收到消息服务器发送来的错误消息之前尝试连接服务器的最大次数,一旦成功连接后,maxReconnectAttempts值开始生效,如果该值为0,则默认采用maxReconnectAttempts。详见FailoverTransport.java代码:

private int calculateReconnectAttemptLimit() {  int maxReconnectValue = this.maxReconnectAttempts;  if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {  maxReconnectValue = this.startupMaxReconnectAttempts;  }  return maxReconnectValue;
}

7)randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略,记住,这种随机策略在第一次选择URI列表中的地址时就开始生效,所以,如果为true的话,一个生产者和一个消费者的Failover连接地址都是两个URI的话,有可能生产者连接的是第一个,而消费者连接的是第二个,造成一个服务器上只有生产者,一个服务器上只有消费者的尴尬境地。
8)backup:默认为false,表示是否在连接初始化时将URI列表中的所有地址都初始化连接,以便快速的失效转移,默认是不开启。
9)timeout:默认为-1,单位毫秒,是否允许在重连过程中设置超时时间来中断的正在阻塞的发送操作。-1表示不允许,其他表示超时时间。这样说你肯定不是很明白,直接看代码吧,下面给出FailoverTransport.java类中oneway方法中的一段代码给你看你就明白了:

for (int i = 0; !disposed; i++) {  try {  // Wait for transport to be connected.  Transport transport = connectedTransport.get();  long start = System.currentTimeMillis();  boolean timedout = false;  while (transport == null && !disposed && connectionFailure == null  && !Thread.currentThread().isInterrupted()) {  if (LOG.isTraceEnabled()) {  LOG.trace("Waiting for transport to reconnect..: " + command);  }  long end = System.currentTimeMillis();  if (command.isMessage() && timeout > 0 && (end - start > timeout)) {  timedout = true;  if (LOG.isInfoEnabled()) {  LOG.info("Failover timed out after " + (end - start) + "ms");  }  break;  }  try {  reconnectMutex.wait(100);  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  if (LOG.isDebugEnabled()) {  LOG.debug("Interupted: " + e, e);  }  }  transport = connectedTransport.get();  }  // 其余的代码略

10)trackMessages:默认值为false,是否缓存在发送中(in-flight messages)的消息,以便重连时让新的Transport继续发送。默认是不开启。
11)maxCacheSize:默认131072,如果trackMessages为true,该值表示缓存消息的最大尺寸,单位byte。
12)updateURIsSupported:默认值为true,表示重连时客户端新的连接器(Transport)是否从消息服务接受接受原来的URI列表的更新,5.4及其以后的版本可用。如果关闭的话,会导致重连后连接器没有其他的URI地址可以Failover。
13)updateURIsURL:默认为null,从5.4及其以后的版本,ActiveMQ支持从文件中加载Failover的URI地址列表,URI还是以逗号分隔,updateURIsURL为文件路径。详见FailoverTransport.java中代码:

private void doUpdateURIsFromDisk() {  // If updateURIsURL is specified, read the file and add any new  // transport URI's to this FailOverTransport.  // Note: Could track file timestamp to avoid unnecessary reading.  String fileURL = getUpdateURIsURL();  if (fileURL != null) {  BufferedReader in = null;  String newUris = null;  StringBuffer buffer = new StringBuffer();  try {  in = new BufferedReader(getURLStream(fileURL));  while (true) {  String line = in.readLine();  if (line == null) {  break;  }  buffer.append(line);  }  newUris = buffer.toString();  } catch (IOException ioe) {  LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);  } finally {  if (in != null) {  try {  in.close();  } catch (IOException ioe) {  // ignore  }  }  }  processNewTransports(isRebalanceUpdateURIs(), newUris);  }
}

14)nested.*:默认为null,5.9及其以后版本可用,表示给嵌套的URL添加额外的选项。 以前,如果你想检测让死连接速度更快,你必须在wireFormat.maxInactivityDuration= 1000选项添加到失效转移列表中的所有嵌套的URL。例如:

failover:(tcp://host01:61616?wireFormat.maxInactivityDuration=1000,tcp://host02:61616?wireFormat.maxInactivityDuration=1000,tcp://host03:61616?wireFormat.maxInactivityDuration=1000)

而现在,你只需要这样:

failover:(tcp://host01:61616,tcp://host02:61616,tcp://host03:61616)?nested.wireFormat.maxInactivityDuration=1000

15)warnAfterReconnectAttempts.*:默认为10,5.10及其以后的版本可用,表示每次重连该次数后会打印日志告警,设置<=0的值表示禁用,FailoverTransport.java类的doReconnect()部分相关代码如下

int warnInterval = getWarnAfterReconnectAttempts();
if (warnInterval > 0 && (connectFailures % warnInterval) == 0) {  LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.",  uris, connectFailures);
}

16) reconnectSupported:默认为true,表示客户端是否应响应经纪人 ConnectionControl事件与重新连接(参见:rebalanceClusterClients)。
如果你使用Failover失效转移,则消息服务器在死亡的那一刻,你的生产者发送消息时默认将阻塞,但你可以设置发送消息阻塞的超时时间(注:timeout参数前面已经讲过了):

failover:(tcp://primary:61616)?timeout=3000

  上面的设置将导致如果3秒后连接还未建立,将导致消息发送失败,但这并不会导致该连接被kill,所以你可以过一阵子后再使用这一个连接来尝试发送消息。
  如果用户希望能追踪到重连过程,可以在ActiveMQConnectionFactory设置一个TransportListener,如下所示:

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
factory.setTransportListener(new TransportListener() {  @Override  public void transportResumed() {  System.out.println("连接器已经恢复完毕!");  }  @Override  public void transportInterupted() {  System.out.println("连接器被中断了!");  }  @Override  public void onException(IOException error) {  System.out.println(error);  }  @Override  public void onCommand(Object command) {  System.out.println(command);  }
});

2、ActiveMQ Server端
  下面我们看看如何在消息服务器(broker)上配置失效转移Failover。
  在消息服务器这边,有一些选项可以将客户端更新到新的消息服务器,如下所示:
1)updateClusterClients:默认为false,如果为true,则会将broker集群的拓扑结构的改变信息传递给连接的客户端。
2)rebalanceClusterClients:默认为false,如果为true,则如果有新的消息服务器加入到消息服务器集群中,则连接的客户端将被要求重新平衡(asked to rebalance)。注意, priorityBackup=true能覆盖。
3)updateClusterClientsOnRemove:默认为false,如果为true,则当一个集群从网络中移除的时候将更新客户端。有了这个选项,可以在消息服务器移除时更新客户端,而不是仅仅只是新增消息服务器时更新。
(难道官方文档有问题:if true, will update clients when a cluster is removed from the network. Having this as separate option enables clients to be updated when new brokers join, but not when brokers leave.)
4)updateClusterFilter:默认为null,如果有值,将会是逗号分隔的正则表达式列表,用来过滤掉失效转移时的消息服务器集群中的服务器名称。
举例:

<broker>...  <transportConnectors>    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterFilter="*A*,*B*" />  </<transportConnectors>  ...</broker>

  如果面配置所示,如果updateClusterClients设置为true,则连接到该服务器的客户端的连接器地址Failover的URI列表只需要写这个服务器地址就行,如:failover://tcp://primary:61616,当有新的消息服务器加入时,这些客户端将自动被更新添加该新消息服务器的地址,如果发生网络或消息服务器宕机的事件,就可以重连接到新的消息服务器上。
  有时候我们希望客户端能优先选择某些消息服务器地址,比如既有本地服务器,又有远程服务器,我们希望本地的应用程序优先选择本地服务器进行连接,从5.6版本开始,ActiveMQ提供了优先级备份(priority backup )的特性,所以你可以让客户端自动重连到所谓的“优先级”URI,你可以在客户端如下配置URI地址:

failover:(tcp://local:61616,tcp://remote:61616)?randomize=false&priorityBackup=true

  如果上面这个地址被用于客户端使用,客户端将尝试并保持连接到本地(上面local表示的地址)的消息服务器,当然,如果本地的服务器故障,将转移到远程(remote代表的地址)服务器。默认情况下,只有URI列表中的第一个被视为优先(本地)URI,在某些情况下,你希望不止一个URI地址优先,则你可以使用priorityURIs参数:

failover:(tcp://local1:61616,tcp://local2:61616,tcp://remote:61616)?randomize=false&priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616

这样,客户端将视local1和local2都为优先URI。

ActiveMQ失效转移(Failover)相关推荐

  1. elasticjob已下线_elastic-job详解(四):失效转移

    elastic-job中最关键的特性之一就是失效转移.配置了失效转移之后,如果在任务执行过程中有一个执行实例挂了,那么之前被分配到这个实例的任务(或者分片)会在下次任务执行之前被重新分配到其他正常节点 ...

  2. mysql 失效转移_MySQL基于MHA的FailOver过程

    大家好,我是anyux.本文介绍MySQL基于MHA的FailOver过程. MHA FailOver过程详解 什么是FailOver 故障转移 主库宕机,一直到业务恢复正常的处理过程 如何处理Fai ...

  3. oracle failovermode,[WK-T]ORACLE 10G 配置故障转移(Failover)

    批注:查询结果中如果是NONE,说明这个连接没有使用TAF:如果和客户端tnsnames.ora配置中的相同,说明使用了TAF. 三.Service-Side TAF Service-Side TAF ...

  4. linux activemq修改端口号,linux下 activemq集群配置

    1.简述:回想老王打电话讲故事案例. 2.优势:解耦,异步,横向扩展,顺序保障,安全可靠... 3.JMS(java message service),是java平台中关于面向消息中间件的API,用于 ...

  5. 如何基于消息中间件实现分布式事务?万字长文给你答案!!

    写在前面 最近小伙伴们的要求越来越高,学完设计模式学高并发,学完高并发又想学Java8新特性,学完Java8新特性又要学Spring,学着Spring又让我整理一篇关于分布式事务的文章,而且还提出了要 ...

  6. 分布式事务之——基于消息中间件实现

    环境需求:假如某人有5个女朋友(有点复杂),每天晚上都会给他的女朋友打电话说晚安,那么每给一个女朋友打电话,其他女朋友都要进入等待状态.一个一个打下去...等打到最后一个已经是凌晨了,对方都睡了.那么 ...

  7. Java消息中间件(activeMQ)

    文章目录 **第一章 消息中间件概述** 1. 消息中间件的好处 2. 什么是消息中间件 3. 什么是JMS(规范) 4. 什么是AMQP(协议) 5. 几个常用消息中间对比 **第二章 初始JMS* ...

  8. ActiveMQ集群

    1.ActiveMQ集群介绍 1.为什么要集群? 实现高可用,以排除单点故障引起的服务中断 实现负载均衡,以提升效率为更多客户提供服务 2.集群方式 客户端集群:让多个消费者消费同一个队列 Broke ...

  9. websphere mq 查看队列中是否有数据_全网最全的 “消息队列”

    消息队列的使用场景 以下介绍消息队列在实际应用常用的使用场景.异步处理.应用解耦.流量削锋和消息通讯四个场景. 1]异步处理:场景说明:用户注册后,需要发注册邮件和注册短信. 引入消息队列后架构如下: ...

  10. Elastic-Job功能特性

    .分布式调度协调:用 ZK实现注册中心 .错过执行作业重触发(Misfire) .支持并行调度(任务分片) .作业分片一致性,保证同一分片在分布式环境中仅一个执行实例 .弹性扩容缩容:将任务拆分为 n ...

最新文章

  1. python - DBUtils 连接池减少oracle数据库的连接数
  2. spring3.2 aop 搭建 (1)
  3. axios请求超时,设置重新请求的完美解决方法
  4. html 循环_一个不被程序员认为是编程语言的语言——HTML,你怎么看?
  5. 7-102 单词首字母大写 (15 分)
  6. ios 6.1中 Release问题
  7. html中的声明的作用域,Html/CSS 作用域
  8. AS 3.0 socket 通信,比较基础比较全【转载】
  9. 使用Springboot收发邮件,如此简单!
  10. 【Java】1818. 绝对差值和---使用二分查找,计算总体和,然后剪掉最大某个差值!!!
  11. 有没有测试牙齿需不需要修正的软件,小虎正畸:测一测你到底需不需要进行牙齿矫正?...
  12. 基于单片机智能婴儿车控制设计(毕业设计)
  13. 电脑异常蓝屏问题排查记录
  14. 关于网络游戏的影响(腾讯游戏)
  15. 洛谷P2058 海港
  16. [矩阵的QR分解系列一] 施密特(Schmidt)正交规范化
  17. 独立钻石C语言Mac,C Code Develo‪p for Mac-C Code Develo‪p Mac版下载 V1.0-PC6苹果网
  18. 社区说|Android 13 新特性 EROFS-只读文件系统解析
  19. [算法深究]奇葩排序
  20. 悠易科技京东云联合解决方案发布会成功举办

热门文章

  1. 电容式 指纹识别 android 智能硬件
  2. 陆奇:创业必知的10大驱动力,让你成为水塘中最大的鱼
  3. Alarm Clock C/C++ Version
  4. 计算机无法装补丁,Win7系统无法安装补丁提示缺少Service Pack系统组件的原因及解决方法...
  5. 基于长短期记忆网络(LSTM)的意见领袖对舆论风向的研究、网络暴力研究、LSTM情感分分类、意见领袖的影响力、神经网络实战、数据分析实战、蔡徐坤潘长江网络暴力事件、数据可视化实战、舆情研究
  6. 打败特斯拉!深度起底「偶然亿万富翁」贾跃亭的法拉第野心
  7. 《挚爱》简谱,网上没找到,自己写了个,非专业,敬请指教!
  8. 【Python军火库】smtplib+email:一起来用Python发送电子邮件吧!
  9. php u8t canonical,php – configure:error:utf8_mime2text()具有新的签名,但U8T_CANONICAL缺少...
  10. 自学简单编程可行吗?