文章目录

  • 1. 设计理念
  • 2. 消费者配置
    • 2.1 项目启动后,读取数据库中消费者配置
    • 2.2 项目启动时,声明内部队列
    • 2.3 项目运行时,动态的新增/减少Consumer配置
  • 3. 监听类配置
    • 3.1 应用队列的监听
    • 3.2 延迟队列的监听
    • 3.3 广播模式队列监听
  • 4. 项目启动时,初始化内部队列

若实现动态的上线下线Consumer,那么就不能使用@RabbitListener方式去声明消费者。

1. 设计理念

  1. 代码中只会存在一个监听类(这个类可以为每一个queue新增一个监听对象),动态的上线下线Consumer,本质上是将Queue加监听对象中。
  2. 通过http通信来只能修改一台机器,但集群所有机器都要增加Consumer,需要使用广播模式通知到每一台机器。

2. 消费者配置

在【RabbitMQ-8】SpringBoot2.x动态的创建Queue、Exchange、VirtualHost、Binding中,项目启动时,会初始化MQ的上下文对象配置(RabbitContextHolder)。

2.1 项目启动后,读取数据库中消费者配置

从数据库中拿到消费者的配置信息(下列代码是在静态代码块中进行模拟)。增加消费者,是将消费者与对应虚拟机的队列创建监听容器。在项目启动时,开启监听。

  1. 这个类依赖RabbitContextHolder类;
  2. 采用自动ACK的方式确认消息;
@Slf4j
@DependsOn("rabbitContextHolder")
@Component
public class ConsumerInitializer {//容器的缓存对象private static ConcurrentHashMap<String, List<SimpleMessageListenerContainer>> containerMapping = new ConcurrentHashMap<>();private static List<RabbitConsumer> rabbitConsumers = new ArrayList<>();//配置中的消费者static {RabbitConsumer consumer = new RabbitConsumer();//虚拟机名consumer.setVhost("/test");//队列名consumer.setQueueName("test.directQueue-1");//开启的消费者线程consumer.setConcurrency("2");//抓取的数量consumer.setPerfetch(10);//重试策略consumer.setRetryStrategyEnum(RetryStrategyEnum.EXPONENTIAL_BACKOFF_10_TIMES);rabbitConsumers.add(consumer);}//项目启动时,动态的创建消费者@PostConstructvoid init() {//以虚拟主机分组Map<String, List<RabbitConsumer>> consumerList = rabbitConsumers.stream().collect(Collectors.groupingBy(RabbitConsumer::getVhost));List<SimpleMessageListenerContainer> containerList = new ArrayList<>();//遍历所有的消费者配置consumerList.forEach((vhost, consumers) -> {//判断内存是否缓存连接工厂(根据虚拟主机名)if (RabbitContextHolder.getConnectionFactory(vhost) == null) {log.error("不能连接到虚拟主机[{}]", vhost);return;}//初始化SimpleMessageListenerContainer监听容器consumers.forEach(consumer -> Optional.ofNullable(newContainer(consumer)).ifPresent(containerList::add));//将容器加入到缓存中containerMapping.put(vhost, containerList);});}/*** 创建监听容器** @param consumer 消费者* @return 监听容器*/public static SimpleMessageListenerContainer newContainer(RabbitConsumer consumer) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();//监听的队列container.setQueueNames(consumer.getQueueName());//自动ACKcontainer.setAcknowledgeMode(AcknowledgeMode.AUTO);container.setAmqpAdmin(RabbitContextHolder.getRabbitAdmin(consumer.getVhost()));container.setConnectionFactory(RabbitContextHolder.getConnectionFactory(consumer.getVhost()));//预取的数量container.setPrefetchCount(consumer.getPerfetch());//是否自动声明 队列、交换机、绑定container.setAutoDeclare(false);//设置初始化后是否自动启动容器。container.setAutoStartup(true);//是否设置排他性,即该消费者独享该队列container.setExclusive(false);//消费者开启几个线程container.setConcurrency(consumer.getConcurrency());//消费者的监听container.setMessageListener(new NormalListener(consumer));//设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。container.setDefaultRequeueRejected(false);//当监听类中不处理异常,那么异常会回调该方法。container.setErrorHandler(e -> log.error("个性化处理:丢弃消息啦!"));//初始化这个类container.afterPropertiesSet();return container;}/*** 项目启动时,启动监听容器。*/public static void startAllConsumers() {containerMapping.values().forEach(it -> {for (SimpleMessageListenerContainer bean : it) {bean.start();}});}
}

2.2 项目启动时,声明内部队列

  1. 为什么需要创建内部——广播模式的队列
  1. 若是动态的增加/减少虚拟机配置,那么需要给集群所有机器的内存新增/减少配置,即调用RabbitContextHolder#addNewVHost方法。
  2. 若是动态的上线/下线Consumer,也需要为给集群所有机器的内存增加/减少消费者容器配置。

广播模式注意点:集群中每台机器在启动时都生成一个队列(队列名由Linux机器的ip地址+随机数组合成),每台机器创建一个Consumer去监听消息。当机器重启时,消费者会下线,对应的广播队列会自动删除。

  1. 为什么需要创建内部——延迟队列

因为上面采用的是自动ACK模式,当发生异常时,会将消息放入到延迟队列中等待下次消费。

@Slf4j
@DependsOn("rabbitContextHolder")
@Component
public class ConsumerInitializer {/*** 启动内部队列** @throws IOException*/public static void initializeInternalConsumers() throws IOException {//启动内部监听(广播监听)startInternalEventConsumer();//当消费者出现异常,开启延迟队列进行重试startRetryEventConsumer();}/*** 开启内部监听(广播模式)* <p>* 项目启动后,本机器开启(广播)消息监听。** @throws IOException*/private static void startInternalEventConsumer() {//为每一台机器声明一个独一无二的队列名,当机器上消费者下线时,自动删除。String queueName = String.format(RabbitConstants.INTERNAL_EVENT_QUEUE_TEMPLATE, getLinuxLocalIp(), getRandomNumber());RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST);Exchange exchange = ExchangeBuilder.fanoutExchange(RabbitConstants.INTERNAL_EVENT_EXCHANGE).durable(true).build();rabbitAdmin.declareExchange(exchange);rabbitAdmin.declareQueue(new Queue(queueName, true, false, true));//广播模式的路由键为""rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, RabbitConstants.INTERNAL_EVENT_EXCHANGE, "", null));//创建内部队列SimpleMessageListenerContainer simpleMessageListenerContainer =newInternalContainer(queueName, "1", ListenerType.NORMAL);simpleMessageListenerContainer.start();}/*** 创建延迟队列*/public static void startRetryEventConsumer() {RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST);Exchange directExchange = ExchangeBuilder.directExchange(RabbitConstants.INTERNAL_DELAYED_EXCHANGE).delayed().durable(true).build();rabbitAdmin.declareExchange(directExchange);rabbitAdmin.declareQueue(new Queue(RabbitConstants.INTERNAL_RETRY_QUEUE, true, false, false));rabbitAdmin.declareBinding(new Binding(RabbitConstants.INTERNAL_RETRY_QUEUE,Binding.DestinationType.QUEUE,RabbitConstants.INTERNAL_DELAYED_EXCHANGE,RabbitConstants.INTERNAL_RETRY_ROUTING_KEY,null));//3-10表示初始化开启3个线程,当负载增加时,线程数缓慢得增加到10个线程。SimpleMessageListenerContainer simpleMessageListenerContainer = newInternalContainer(RabbitConstants.INTERNAL_RETRY_QUEUE,"3-10",ListenerType.RETRY);simpleMessageListenerContainer.start();}/*** 创建内部监听容器(广播模式&&延迟队列)** @param queueNames  队列名* @param concurrency 并发数* @return*/public static SimpleMessageListenerContainer newInternalContainer(String queueNames, String concurrency, ListenerType listenerType) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setQueueNames(queueNames);container.setAcknowledgeMode(AcknowledgeMode.AUTO);container.setAmqpAdmin(RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST));container.setConnectionFactory(RabbitContextHolder.getConnectionFactory(RabbitConstants.INTERNAL_VHOST));container.setAutoStartup(true);container.setAutoDeclare(false);container.setExclusive(false);//该参数默认是250,按照参数修改。container.setPrefetchCount(1);//3-10 默认是3个,若是负载上来的话,会缓慢增长到10个。container.setConcurrency(concurrency);if (ListenerType.NORMAL.equals(listenerType)) {container.setMessageListener(new InternalEventMessageListener());} else {container.setMessageListener(new RetryMessageListener());}return container;}/*** 获取到每台服务器的Ip地址,组装队列名** @return*/private static String getLinuxLocalIp() {String ip = "";try {for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {NetworkInterface intf = en.nextElement();String name = intf.getName();if (!name.contains("docker") && !name.contains("lo")) {for (Enumeration<InetAddress> enumIpAddr = intf.getInetAddresses(); enumIpAddr.hasMoreElements(); ) {InetAddress inetAddress = enumIpAddr.nextElement();if (!inetAddress.isLoopbackAddress()) {String ipaddress = inetAddress.getHostAddress();if (!ipaddress.contains("::") && !ipaddress.contains("0:0:") && !ipaddress.contains("fe80")) {ip = ipaddress;}}}}}} catch (SocketException ex) {log.error("获取ip异常");ip = "127.0.0.1";ex.printStackTrace();}log.info("Linux的IP为:" + ip);return ip;}public static int getRandomNumber() {return (int) ((Math.random() * 9 + 1) * 100000);}
}

2.3 项目运行时,动态的新增/减少Consumer配置

@Slf4j
@DependsOn("rabbitContextHolder")
@Component
public class ConsumerInitializer {//容器的缓存对象private static ConcurrentHashMap<String, List<SimpleMessageListenerContainer>> containerMapping = new ConcurrentHashMap<>();/*** 挂起消费者* <p>* 将一个队列的所有消费者全部挂起。** @param vhost     虚拟主机* @param queueName 队列名*/public static void blockConsumer(String vhost, String queueName) {//消费者的集合Optional<SimpleMessageListenerContainer> opt = ConsumerInitializer.getContainerMapping(vhost).stream().filter(c -> Arrays.asList(c.getQueueNames()).contains(queueName)).findAny();opt.ifPresent(c -> {//停止消费者c.stop();//在缓存中找到容器信息并删除ConsumerInitializer.getContainerMapping(vhost).remove(c);});}/*** 开启消费者** @param consumer*/public static void openConsumer(RabbitConsumer consumer) {//为每台机器new出现一个监听容器SimpleMessageListenerContainer simpleMessageListenerContainer = ConsumerInitializer.newContainer(consumer);if (simpleMessageListenerContainer != null) {//放入缓存中ConsumerInitializer.getContainerMapping(consumer.getVhost()).add(simpleMessageListenerContainer);//开启消费者simpleMessageListenerContainer.start();}}
}

实体类,消费者信息:

@Data
public class RabbitConsumer {/*** 虚拟主机*/private String vhost;/*** 并发数*/private String concurrency;/*** 一次性获取消息数目* limit 1-999*/private Integer perfetch;/*** 监听的队列名*/private String queueName;/*** 回调的API地址*/private String bizApi;private RetryStrategyEnum retryStrategyEnum;/*** API 返回成功值*/private String bizSuccessCode;/*** API 超时时间*/private Integer timeout;}

3. 监听类配置

3.1 应用队列的监听

该项目中只存在一个监听类,每一个队列都会new一个监听对象来处理。当收到监听信息后,会在RabbitConsumer找到对API地址,通过http回调应用服务器方法。进行消息的处理,当发生异常后,会根据重试策略,将消息重新丢弃或放入到延迟队列(注意的是,若消息是广播模式的那么不会进行重试)。

@Slf4j
@Data
public class NormalListener implements ChannelAwareMessageListener {private RabbitConsumer consumer;public NormalListener() {}public NormalListener(RabbitConsumer consumer) {this.consumer = consumer;}@Overridepublic void onMessage(Message message, Channel channel) {MessageHandler messageHandler = SpringUtil.getBean(MessageHandler.class);//处理消息messageHandler.normalProcess(message,consumer);}
}

实现消息异常的重试——放入到延迟队列中。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.Optional;/*** @author by yexuerui@xdf.cn* @Date 2020-08-05 16:01*/
@Component
@Slf4j
public class MessageHandler {/*** 消息处理 核心流程** @param consumer 消费者* @return 返回值为处理成功或失败*/public boolean normalProcess(Message message, RabbitConsumer consumer) {String messageId = message.getMessageProperties().getMessageId();try {String url = consumer.getBizApi();log.info("回调业务接口处理消息,url为{}!", url);//获取到配置的urelThread.sleep(2000000);return true;} catch (Exception e) {log.error("An error occurs during process an message in PigeonMessageListener, MessageId : {}", messageId, e);dueException(consumer, message);return false;}}/*** error** @param consumer* @param message*/private void dueException(RabbitConsumer consumer, Message message) {Map<String, Object> headers = message.getMessageProperties().getHeaders();String vhost = (String) headers.get("vhost");String exchange = (String) headers.get("exchange");if (!this.wasFanout(vhost, exchange)) {message.getMessageProperties().setHeader("consumer", JSONObject.toJSONString(consumer));RetryStrategyEnum retryStrategy = Optional.ofNullable(consumer.getRetryStrategyEnum()).orElse(RetryStrategyEnum.NONE);retryPolicy(message, retryStrategy);}}/*** 验证exchange是否为FANOUT类型** @param vhost    vhost* @param exchange exchange* @return exchange type is fouout*/private boolean wasFanout(String vhost, String exchange) {return false;}/*** 重试策略** @param message* @param retryStrategy*/private void retryPolicy(Message message, RetryStrategyEnum retryStrategy) {Map<String, Object> headers = message.getMessageProperties().getHeaders();Integer retryTimes = (Integer) headers.getOrDefault("retry_times", 0);switch (retryStrategy) {case NONE:log.info("不进行重试");break;case IMMEDIATELY_THREE_TIMES:if (retryTimes < 3) {log.info("固定时间重试");//放入到延迟队列中,等待下次调用sendToRetry(message, retryStrategy);} else {log.info("固定时间重试——没有重试机会!");}break;case EXPONENTIAL_BACKOFF_10_TIMES:if (retryTimes < 1) {log.info("阶梯时间重试");sendToRetry(message, retryStrategy);} else {log.info("阶梯时间重试——没有重试机会!");}break;default:break;}}public void sendToRetry(Message message, RetryStrategyEnum retryStrategy) {Map<String, Object> headers = message.getMessageProperties().getHeaders();//获取到重试的次数Integer retryTimes = (Integer) headers.getOrDefault("retry_times", 0);int delayMillions;if (RetryStrategyEnum.IMMEDIATELY_THREE_TIMES.equals(retryStrategy)) {delayMillions = 1000;} else if (RetryStrategyEnum.EXPONENTIAL_BACKOFF_10_TIMES.equals(retryStrategy)) {//指数增加延迟时间delayMillions = (2 << retryTimes) * 1000;} else {log.error("不进行重试!");return;}//设置重试次数message.getMessageProperties().setHeader("retry_times", retryTimes + 1);//设置延迟行log.error("延迟时间:[{}]", delayMillions);message.getMessageProperties().setDelay(delayMillions);//发送消息到延迟队列RabbitTemplate rabbitTemplate = RabbitContextHolder.getRabbitTemplate(RabbitConstants.INTERNAL_VHOST);rabbitTemplate.convertAndSend(RabbitConstants.INTERNAL_DELAYED_EXCHANGE,RabbitConstants.INTERNAL_RETRY_ROUTING_KEY,message);}
}

3.2 延迟队列的监听

当延迟队列收到消息后,会继续对消息进行消费,若消费失败,根据重试策略,对消息选择丢弃或继续放入延迟队列等待处理。

public class RetryMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {Map<String, Object> headers = message.getMessageProperties().getHeaders();String consumerStr = (String)headers.get("consumer");RabbitConsumer consumer = JSONObject.parseObject(consumerStr, RabbitConsumer.class);//设置延迟的时间MessageHandler messageHandler = SpringUtil.getBean(MessageHandler.class);messageHandler.normalProcess(message,consumer);}
}

3.3 广播模式队列监听

广播模式的事件类型分为两类:

  1. 在内存中新增/减少消费者;
  2. 在内存中新增/减少RabbitMQ的配置信息(RabbitAdmin、RabbitTemplate等)
@Slf4j
public class InternalEventMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {String body = new String(message.getBody(), StandardCharsets.UTF_8);InternalEvent event = JSONObject.parseObject(body, InternalEvent.class);log.info("内部监听到信息[{}]", JSONObject.toJSONString(event));//下掉一个队列的所有消费者if (InternalEvent.EventType.CONSUMER.equals(event.getEventType())) {if (InternalEvent.Operation.DISABLE == event.getOperation()) {log.info("下掉一个队列上的所有消费者...");ConsumerInitializer.blockConsumer("/test", "test.directQueue-1");} else {//为一个队列增加消费者RabbitConsumer consumer = new RabbitConsumer();consumer.setVhost("/test");consumer.setQueueName("test.directQueue-1");consumer.setConcurrency("1");consumer.setPerfetch(1);ConsumerInitializer.openConsumer(consumer);}} else {//增加虚拟机配置if (InternalEvent.Operation.DISABLE.equals(event.getOperation())) {//广播通知,为内存中的map增加数据RabbitContextHolder.delVHost("/test");} else {//广播通知,为内存中的map增加数据RabbitVirtualHost rabbitVirtualHost = new RabbitVirtualHost();rabbitVirtualHost.setHost("localhost");rabbitVirtualHost.setPort(5672);rabbitVirtualHost.setUsername("guest");rabbitVirtualHost.setPassword("guest");rabbitVirtualHost.setVhost("/test");//为内存中增加数据RabbitContextHolder.addNewVHost(rabbitVirtualHost);}}}
}

事件类配置

@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class InternalEvent {@NotNullprivate EventType eventType;@NotNullprivate Operation operation;/*** 通过id去数据库那到虚拟机或者消费者的配置*/@NotNullprivate Integer id;public enum EventType {VHOST,CONSUMER}public enum Operation {ENABLE,DISABLE}
}

4. 项目启动时,初始化内部队列

SpringBoot项目启动时,初始化方法加载顺序可以参考【SpringBoot2.x-1】初始化方法汇总。

@Slf4j
@Component
public class ApplicationStartedListener implements ApplicationListener<ApplicationReadyEvent> {@AutowiredApplicationContext applicationContext;@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {try {ConsumerInitializer.startAllConsumers();log.info("启动消息中心所有的消费者!");ConsumerInitializer.initializeInternalConsumers();log.info("启动 内部 事件监听消费者 && 内容重试消费者");} catch (Exception e) {log.error("", e);SpringApplication.exit(applicationContext);}}
}

【RabbitMQ-9】动态上线下线Consumer相关推荐

  1. Spark独立集群动态上线下线Worker节点

    文章目录 (一)下线Worker节点 (二)上线Worker节点 (一)下线Worker节点 我的操作:关机就是了--

  2. hdfs haadmin使用,DataNode动态上下线,NameNode状态切换管理,数据块的balance,HA下hdfs-api变化(来自学习资料)

    1.2.4集群运维测试 HA集群中两个namenode状态的管理命令 [root@mini2 hadoop-2.6.4]# bin/hdfs haadmin Usage: DFSHAAdmin [-n ...

  3. hdfs haadmin使用,DataNode动态上下线,NameNode状态切换管理,数据块的balance,HA下hdfs-api变化(来自学习资料)...

    1.2.4集群运维测试 HA集群中两个namenode状态的管理命令 [root@mini2 hadoop-2.6.4]# bin/hdfs haadmin Usage: DFSHAAdmin [-n ...

  4. 服务器动态上下线监听案例

    服务器动态上下线监听案例 文章目录 1.需求 2.需求分析 3.编程实现 1.先在集群上创建/servers节点 2.服务器端向Zookeeper注册代码(Server) 3.客户端代码(Client ...

  5. Zookeeper——服务器动态上下线、客户端动态监听

    文章目录: 1.前言 2.实操步骤 2.1 服务端代码 2.2 客户端代码 2.3 测试 1.前言 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线. ...

  6. Zookeeper服务器动态上下线idea上server类中server.regist(args[0])数组越界解决

    Zookeeper服务器动态上下线idea上server类中server.regist(args[0])数组越界解决 运行server服务类时报错:Exception in thread"m ...

  7. 分布式服务动态上下线感知

    分布式服务动态上下线感知 首先,我们要从解决问题的角度得知分布式服务的由来,从单机服务到分布式服务经历了哪些过程 起初,服务是比较单一的,在一个工程包之中会包含所有的模块,但随着互联网的快速发展,客户 ...

  8. zookpeer实现对服务器动态上下线的监听

    服务器动态上下线程序的工作机制 服务器代码: 补充:volatile关键字:java中一切都是对象,当多个线程操作同一个对象时候,该对象会放在堆内存中,而多个线程相当于在多个栈中,当A线程想要去除对象 ...

  9. 【API接口】接口上线下线 用户在线测试,和管理员发布api待完善...

    接口上线下线 上线: //判空 //此接口是否存在 //是否为管理员 //设置数据库status -0:下线 -1:上线 下线: 同上线 用户在线测试 //判空 //是否申请API签名秘钥 -免费试用 ...

最新文章

  1. 手动添加Cookie
  2. Serverless 时代前端避坑指南
  3. 从0开始学习 GitHub 系列之「03.Git 速成」
  4. 批量关闭公众号推送_微信内测新功能:可批量关闭订阅号推送
  5. cvpr 注意力机制_视频人员重识别:关系引导空间注意力 + 时间特征提取模型
  6. Swift 5新特性详解:ABI 稳定终于来了!
  7. matlab求两向量夹角_高等数学之向量代数与空间解析几何知识点与题型总结
  8. 华工计算机学院专硕分数线,2017华南理工大学
  9. python正弦波叠加方波_电赛初探(一)——正弦波、方波、锯齿波转换
  10. 浅析大数据给我们带来的便利和好处
  11. 解决谷歌浏览器Chrome不能播放央视新闻视频的问题
  12. webpack合成sprite图
  13. win7安全模式如何打开计算机管理,Win7安全模式怎么进?Win7进入安全模式方法
  14. 员工离职率预测,练手赛
  15. 深入理解Python中的if语句
  16. 解决谷歌浏览器跨域以及cookie保存失效重复登录
  17. 黑客们的往事(连载十) 凯文·米特尼克
  18. composer 指定PHP版本
  19. 排名靠前的5个编程论坛
  20. vue3利用JS切换背景图

热门文章

  1. 真正厉害的人,你永远察觉不到他的情绪
  2. 【社区图书馆】读《悲惨世界》有感
  3. python wordcloud详解_Python+wordcloud十分钟学会生成英文词云
  4. 淘宝APP商品详情接口(商品信息,价格销量,优惠券信息,详情图等)
  5. webug————显错注入
  6. 基于MATLAB图像检索系统GUI设计解析
  7. a.b.art One时尚智能腕表:这才是智能手表的正确方向
  8. c#俄罗斯方块视频教程下载地址[陈广老师]
  9. Windows下Redis集群搭建(超详细教程)
  10. javascript执行上下文