【RabbitMQ-9】动态上线下线Consumer
文章目录
- 1. 设计理念
- 2. 消费者配置
- 2.1 项目启动后,读取数据库中消费者配置
- 2.2 项目启动时,声明内部队列
- 2.3 项目运行时,动态的新增/减少Consumer配置
- 3. 监听类配置
- 3.1 应用队列的监听
- 3.2 延迟队列的监听
- 3.3 广播模式队列监听
- 4. 项目启动时,初始化内部队列
若实现动态的上线下线Consumer,那么就不能使用
@RabbitListener
方式去声明消费者。
1. 设计理念
- 代码中只会存在一个
监听类
(这个类可以为每一个queue新增一个监听对象),动态的上线下线Consumer,本质上是将Queue加监听对象
中。 - 通过http通信来只能修改一台机器,但集群所有机器都要增加Consumer,需要使用
广播模式
通知到每一台机器。
2. 消费者配置
在【RabbitMQ-8】SpringBoot2.x动态的创建Queue、Exchange、VirtualHost、Binding中,项目启动时,会初始化MQ的上下文对象配置(RabbitContextHolder
)。
2.1 项目启动后,读取数据库中消费者配置
从数据库中拿到消费者的配置信息(下列代码是在静态代码块中进行模拟)。增加消费者,是将消费者与对应虚拟机的队列创建监听容器。在项目启动时,开启监听。
- 这个类依赖
RabbitContextHolder
类; - 采用自动ACK的方式确认消息;
@Slf4j
@DependsOn("rabbitContextHolder")
@Component
public class ConsumerInitializer {//容器的缓存对象private static ConcurrentHashMap<String, List<SimpleMessageListenerContainer>> containerMapping = new ConcurrentHashMap<>();private static List<RabbitConsumer> rabbitConsumers = new ArrayList<>();//配置中的消费者static {RabbitConsumer consumer = new RabbitConsumer();//虚拟机名consumer.setVhost("/test");//队列名consumer.setQueueName("test.directQueue-1");//开启的消费者线程consumer.setConcurrency("2");//抓取的数量consumer.setPerfetch(10);//重试策略consumer.setRetryStrategyEnum(RetryStrategyEnum.EXPONENTIAL_BACKOFF_10_TIMES);rabbitConsumers.add(consumer);}//项目启动时,动态的创建消费者@PostConstructvoid init() {//以虚拟主机分组Map<String, List<RabbitConsumer>> consumerList = rabbitConsumers.stream().collect(Collectors.groupingBy(RabbitConsumer::getVhost));List<SimpleMessageListenerContainer> containerList = new ArrayList<>();//遍历所有的消费者配置consumerList.forEach((vhost, consumers) -> {//判断内存是否缓存连接工厂(根据虚拟主机名)if (RabbitContextHolder.getConnectionFactory(vhost) == null) {log.error("不能连接到虚拟主机[{}]", vhost);return;}//初始化SimpleMessageListenerContainer监听容器consumers.forEach(consumer -> Optional.ofNullable(newContainer(consumer)).ifPresent(containerList::add));//将容器加入到缓存中containerMapping.put(vhost, containerList);});}/*** 创建监听容器** @param consumer 消费者* @return 监听容器*/public static SimpleMessageListenerContainer newContainer(RabbitConsumer consumer) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();//监听的队列container.setQueueNames(consumer.getQueueName());//自动ACKcontainer.setAcknowledgeMode(AcknowledgeMode.AUTO);container.setAmqpAdmin(RabbitContextHolder.getRabbitAdmin(consumer.getVhost()));container.setConnectionFactory(RabbitContextHolder.getConnectionFactory(consumer.getVhost()));//预取的数量container.setPrefetchCount(consumer.getPerfetch());//是否自动声明 队列、交换机、绑定container.setAutoDeclare(false);//设置初始化后是否自动启动容器。container.setAutoStartup(true);//是否设置排他性,即该消费者独享该队列container.setExclusive(false);//消费者开启几个线程container.setConcurrency(consumer.getConcurrency());//消费者的监听container.setMessageListener(new NormalListener(consumer));//设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。container.setDefaultRequeueRejected(false);//当监听类中不处理异常,那么异常会回调该方法。container.setErrorHandler(e -> log.error("个性化处理:丢弃消息啦!"));//初始化这个类container.afterPropertiesSet();return container;}/*** 项目启动时,启动监听容器。*/public static void startAllConsumers() {containerMapping.values().forEach(it -> {for (SimpleMessageListenerContainer bean : it) {bean.start();}});}
}
2.2 项目启动时,声明内部队列
- 为什么需要创建内部——广播模式的队列
- 若是动态的增加/减少虚拟机配置,那么需要给集群所有机器的内存新增/减少配置,即调用
RabbitContextHolder#addNewVHost
方法。 - 若是动态的上线/下线Consumer,也需要为给集群所有机器的内存增加/减少消费者容器配置。
广播模式注意点:集群中每台机器在启动时都生成一个队列(队列名由Linux机器的ip地址+随机数组合成),每台机器创建一个Consumer去监听消息。当机器重启时,消费者会下线,对应的广播队列会自动删除。
- 为什么需要创建内部——延迟队列
因为上面采用的是自动ACK模式,当发生异常时,会将消息放入到延迟队列中等待下次消费。
@Slf4j
@DependsOn("rabbitContextHolder")
@Component
public class ConsumerInitializer {/*** 启动内部队列** @throws IOException*/public static void initializeInternalConsumers() throws IOException {//启动内部监听(广播监听)startInternalEventConsumer();//当消费者出现异常,开启延迟队列进行重试startRetryEventConsumer();}/*** 开启内部监听(广播模式)* <p>* 项目启动后,本机器开启(广播)消息监听。** @throws IOException*/private static void startInternalEventConsumer() {//为每一台机器声明一个独一无二的队列名,当机器上消费者下线时,自动删除。String queueName = String.format(RabbitConstants.INTERNAL_EVENT_QUEUE_TEMPLATE, getLinuxLocalIp(), getRandomNumber());RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST);Exchange exchange = ExchangeBuilder.fanoutExchange(RabbitConstants.INTERNAL_EVENT_EXCHANGE).durable(true).build();rabbitAdmin.declareExchange(exchange);rabbitAdmin.declareQueue(new Queue(queueName, true, false, true));//广播模式的路由键为""rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, RabbitConstants.INTERNAL_EVENT_EXCHANGE, "", null));//创建内部队列SimpleMessageListenerContainer simpleMessageListenerContainer =newInternalContainer(queueName, "1", ListenerType.NORMAL);simpleMessageListenerContainer.start();}/*** 创建延迟队列*/public static void startRetryEventConsumer() {RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST);Exchange directExchange = ExchangeBuilder.directExchange(RabbitConstants.INTERNAL_DELAYED_EXCHANGE).delayed().durable(true).build();rabbitAdmin.declareExchange(directExchange);rabbitAdmin.declareQueue(new Queue(RabbitConstants.INTERNAL_RETRY_QUEUE, true, false, false));rabbitAdmin.declareBinding(new Binding(RabbitConstants.INTERNAL_RETRY_QUEUE,Binding.DestinationType.QUEUE,RabbitConstants.INTERNAL_DELAYED_EXCHANGE,RabbitConstants.INTERNAL_RETRY_ROUTING_KEY,null));//3-10表示初始化开启3个线程,当负载增加时,线程数缓慢得增加到10个线程。SimpleMessageListenerContainer simpleMessageListenerContainer = newInternalContainer(RabbitConstants.INTERNAL_RETRY_QUEUE,"3-10",ListenerType.RETRY);simpleMessageListenerContainer.start();}/*** 创建内部监听容器(广播模式&&延迟队列)** @param queueNames 队列名* @param concurrency 并发数* @return*/public static SimpleMessageListenerContainer newInternalContainer(String queueNames, String concurrency, ListenerType listenerType) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setQueueNames(queueNames);container.setAcknowledgeMode(AcknowledgeMode.AUTO);container.setAmqpAdmin(RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST));container.setConnectionFactory(RabbitContextHolder.getConnectionFactory(RabbitConstants.INTERNAL_VHOST));container.setAutoStartup(true);container.setAutoDeclare(false);container.setExclusive(false);//该参数默认是250,按照参数修改。container.setPrefetchCount(1);//3-10 默认是3个,若是负载上来的话,会缓慢增长到10个。container.setConcurrency(concurrency);if (ListenerType.NORMAL.equals(listenerType)) {container.setMessageListener(new InternalEventMessageListener());} else {container.setMessageListener(new RetryMessageListener());}return container;}/*** 获取到每台服务器的Ip地址,组装队列名** @return*/private static String getLinuxLocalIp() {String ip = "";try {for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {NetworkInterface intf = en.nextElement();String name = intf.getName();if (!name.contains("docker") && !name.contains("lo")) {for (Enumeration<InetAddress> enumIpAddr = intf.getInetAddresses(); enumIpAddr.hasMoreElements(); ) {InetAddress inetAddress = enumIpAddr.nextElement();if (!inetAddress.isLoopbackAddress()) {String ipaddress = inetAddress.getHostAddress();if (!ipaddress.contains("::") && !ipaddress.contains("0:0:") && !ipaddress.contains("fe80")) {ip = ipaddress;}}}}}} catch (SocketException ex) {log.error("获取ip异常");ip = "127.0.0.1";ex.printStackTrace();}log.info("Linux的IP为:" + ip);return ip;}public static int getRandomNumber() {return (int) ((Math.random() * 9 + 1) * 100000);}
}
2.3 项目运行时,动态的新增/减少Consumer配置
@Slf4j
@DependsOn("rabbitContextHolder")
@Component
public class ConsumerInitializer {//容器的缓存对象private static ConcurrentHashMap<String, List<SimpleMessageListenerContainer>> containerMapping = new ConcurrentHashMap<>();/*** 挂起消费者* <p>* 将一个队列的所有消费者全部挂起。** @param vhost 虚拟主机* @param queueName 队列名*/public static void blockConsumer(String vhost, String queueName) {//消费者的集合Optional<SimpleMessageListenerContainer> opt = ConsumerInitializer.getContainerMapping(vhost).stream().filter(c -> Arrays.asList(c.getQueueNames()).contains(queueName)).findAny();opt.ifPresent(c -> {//停止消费者c.stop();//在缓存中找到容器信息并删除ConsumerInitializer.getContainerMapping(vhost).remove(c);});}/*** 开启消费者** @param consumer*/public static void openConsumer(RabbitConsumer consumer) {//为每台机器new出现一个监听容器SimpleMessageListenerContainer simpleMessageListenerContainer = ConsumerInitializer.newContainer(consumer);if (simpleMessageListenerContainer != null) {//放入缓存中ConsumerInitializer.getContainerMapping(consumer.getVhost()).add(simpleMessageListenerContainer);//开启消费者simpleMessageListenerContainer.start();}}
}
实体类,消费者信息:
@Data
public class RabbitConsumer {/*** 虚拟主机*/private String vhost;/*** 并发数*/private String concurrency;/*** 一次性获取消息数目* limit 1-999*/private Integer perfetch;/*** 监听的队列名*/private String queueName;/*** 回调的API地址*/private String bizApi;private RetryStrategyEnum retryStrategyEnum;/*** API 返回成功值*/private String bizSuccessCode;/*** API 超时时间*/private Integer timeout;}
3. 监听类配置
3.1 应用队列的监听
该项目中只存在一个监听类,每一个队列都会new一个监听对象来处理。当收到监听信息后,会在RabbitConsumer
找到对API地址,通过http回调应用服务器方法。进行消息的处理,当发生异常后,会根据重试策略,将消息重新丢弃或放入到延迟队列(注意的是,若消息是广播模式的那么不会进行重试)。
@Slf4j
@Data
public class NormalListener implements ChannelAwareMessageListener {private RabbitConsumer consumer;public NormalListener() {}public NormalListener(RabbitConsumer consumer) {this.consumer = consumer;}@Overridepublic void onMessage(Message message, Channel channel) {MessageHandler messageHandler = SpringUtil.getBean(MessageHandler.class);//处理消息messageHandler.normalProcess(message,consumer);}
}
实现消息异常的重试——放入到延迟队列中。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.Optional;/*** @author by yexuerui@xdf.cn* @Date 2020-08-05 16:01*/
@Component
@Slf4j
public class MessageHandler {/*** 消息处理 核心流程** @param consumer 消费者* @return 返回值为处理成功或失败*/public boolean normalProcess(Message message, RabbitConsumer consumer) {String messageId = message.getMessageProperties().getMessageId();try {String url = consumer.getBizApi();log.info("回调业务接口处理消息,url为{}!", url);//获取到配置的urelThread.sleep(2000000);return true;} catch (Exception e) {log.error("An error occurs during process an message in PigeonMessageListener, MessageId : {}", messageId, e);dueException(consumer, message);return false;}}/*** error** @param consumer* @param message*/private void dueException(RabbitConsumer consumer, Message message) {Map<String, Object> headers = message.getMessageProperties().getHeaders();String vhost = (String) headers.get("vhost");String exchange = (String) headers.get("exchange");if (!this.wasFanout(vhost, exchange)) {message.getMessageProperties().setHeader("consumer", JSONObject.toJSONString(consumer));RetryStrategyEnum retryStrategy = Optional.ofNullable(consumer.getRetryStrategyEnum()).orElse(RetryStrategyEnum.NONE);retryPolicy(message, retryStrategy);}}/*** 验证exchange是否为FANOUT类型** @param vhost vhost* @param exchange exchange* @return exchange type is fouout*/private boolean wasFanout(String vhost, String exchange) {return false;}/*** 重试策略** @param message* @param retryStrategy*/private void retryPolicy(Message message, RetryStrategyEnum retryStrategy) {Map<String, Object> headers = message.getMessageProperties().getHeaders();Integer retryTimes = (Integer) headers.getOrDefault("retry_times", 0);switch (retryStrategy) {case NONE:log.info("不进行重试");break;case IMMEDIATELY_THREE_TIMES:if (retryTimes < 3) {log.info("固定时间重试");//放入到延迟队列中,等待下次调用sendToRetry(message, retryStrategy);} else {log.info("固定时间重试——没有重试机会!");}break;case EXPONENTIAL_BACKOFF_10_TIMES:if (retryTimes < 1) {log.info("阶梯时间重试");sendToRetry(message, retryStrategy);} else {log.info("阶梯时间重试——没有重试机会!");}break;default:break;}}public void sendToRetry(Message message, RetryStrategyEnum retryStrategy) {Map<String, Object> headers = message.getMessageProperties().getHeaders();//获取到重试的次数Integer retryTimes = (Integer) headers.getOrDefault("retry_times", 0);int delayMillions;if (RetryStrategyEnum.IMMEDIATELY_THREE_TIMES.equals(retryStrategy)) {delayMillions = 1000;} else if (RetryStrategyEnum.EXPONENTIAL_BACKOFF_10_TIMES.equals(retryStrategy)) {//指数增加延迟时间delayMillions = (2 << retryTimes) * 1000;} else {log.error("不进行重试!");return;}//设置重试次数message.getMessageProperties().setHeader("retry_times", retryTimes + 1);//设置延迟行log.error("延迟时间:[{}]", delayMillions);message.getMessageProperties().setDelay(delayMillions);//发送消息到延迟队列RabbitTemplate rabbitTemplate = RabbitContextHolder.getRabbitTemplate(RabbitConstants.INTERNAL_VHOST);rabbitTemplate.convertAndSend(RabbitConstants.INTERNAL_DELAYED_EXCHANGE,RabbitConstants.INTERNAL_RETRY_ROUTING_KEY,message);}
}
3.2 延迟队列的监听
当延迟队列收到消息后,会继续对消息进行消费,若消费失败,根据重试策略,对消息选择丢弃或继续放入延迟队列等待处理。
public class RetryMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {Map<String, Object> headers = message.getMessageProperties().getHeaders();String consumerStr = (String)headers.get("consumer");RabbitConsumer consumer = JSONObject.parseObject(consumerStr, RabbitConsumer.class);//设置延迟的时间MessageHandler messageHandler = SpringUtil.getBean(MessageHandler.class);messageHandler.normalProcess(message,consumer);}
}
3.3 广播模式队列监听
广播模式的事件类型分为两类:
- 在内存中新增/减少消费者;
- 在内存中新增/减少RabbitMQ的配置信息(RabbitAdmin、RabbitTemplate等)
@Slf4j
public class InternalEventMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {String body = new String(message.getBody(), StandardCharsets.UTF_8);InternalEvent event = JSONObject.parseObject(body, InternalEvent.class);log.info("内部监听到信息[{}]", JSONObject.toJSONString(event));//下掉一个队列的所有消费者if (InternalEvent.EventType.CONSUMER.equals(event.getEventType())) {if (InternalEvent.Operation.DISABLE == event.getOperation()) {log.info("下掉一个队列上的所有消费者...");ConsumerInitializer.blockConsumer("/test", "test.directQueue-1");} else {//为一个队列增加消费者RabbitConsumer consumer = new RabbitConsumer();consumer.setVhost("/test");consumer.setQueueName("test.directQueue-1");consumer.setConcurrency("1");consumer.setPerfetch(1);ConsumerInitializer.openConsumer(consumer);}} else {//增加虚拟机配置if (InternalEvent.Operation.DISABLE.equals(event.getOperation())) {//广播通知,为内存中的map增加数据RabbitContextHolder.delVHost("/test");} else {//广播通知,为内存中的map增加数据RabbitVirtualHost rabbitVirtualHost = new RabbitVirtualHost();rabbitVirtualHost.setHost("localhost");rabbitVirtualHost.setPort(5672);rabbitVirtualHost.setUsername("guest");rabbitVirtualHost.setPassword("guest");rabbitVirtualHost.setVhost("/test");//为内存中增加数据RabbitContextHolder.addNewVHost(rabbitVirtualHost);}}}
}
事件类配置
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class InternalEvent {@NotNullprivate EventType eventType;@NotNullprivate Operation operation;/*** 通过id去数据库那到虚拟机或者消费者的配置*/@NotNullprivate Integer id;public enum EventType {VHOST,CONSUMER}public enum Operation {ENABLE,DISABLE}
}
4. 项目启动时,初始化内部队列
SpringBoot项目启动时,初始化方法加载顺序可以参考【SpringBoot2.x-1】初始化方法汇总。
@Slf4j
@Component
public class ApplicationStartedListener implements ApplicationListener<ApplicationReadyEvent> {@AutowiredApplicationContext applicationContext;@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {try {ConsumerInitializer.startAllConsumers();log.info("启动消息中心所有的消费者!");ConsumerInitializer.initializeInternalConsumers();log.info("启动 内部 事件监听消费者 && 内容重试消费者");} catch (Exception e) {log.error("", e);SpringApplication.exit(applicationContext);}}
}
【RabbitMQ-9】动态上线下线Consumer相关推荐
- Spark独立集群动态上线下线Worker节点
文章目录 (一)下线Worker节点 (二)上线Worker节点 (一)下线Worker节点 我的操作:关机就是了--
- hdfs haadmin使用,DataNode动态上下线,NameNode状态切换管理,数据块的balance,HA下hdfs-api变化(来自学习资料)
1.2.4集群运维测试 HA集群中两个namenode状态的管理命令 [root@mini2 hadoop-2.6.4]# bin/hdfs haadmin Usage: DFSHAAdmin [-n ...
- hdfs haadmin使用,DataNode动态上下线,NameNode状态切换管理,数据块的balance,HA下hdfs-api变化(来自学习资料)...
1.2.4集群运维测试 HA集群中两个namenode状态的管理命令 [root@mini2 hadoop-2.6.4]# bin/hdfs haadmin Usage: DFSHAAdmin [-n ...
- 服务器动态上下线监听案例
服务器动态上下线监听案例 文章目录 1.需求 2.需求分析 3.编程实现 1.先在集群上创建/servers节点 2.服务器端向Zookeeper注册代码(Server) 3.客户端代码(Client ...
- Zookeeper——服务器动态上下线、客户端动态监听
文章目录: 1.前言 2.实操步骤 2.1 服务端代码 2.2 客户端代码 2.3 测试 1.前言 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线. ...
- Zookeeper服务器动态上下线idea上server类中server.regist(args[0])数组越界解决
Zookeeper服务器动态上下线idea上server类中server.regist(args[0])数组越界解决 运行server服务类时报错:Exception in thread"m ...
- 分布式服务动态上下线感知
分布式服务动态上下线感知 首先,我们要从解决问题的角度得知分布式服务的由来,从单机服务到分布式服务经历了哪些过程 起初,服务是比较单一的,在一个工程包之中会包含所有的模块,但随着互联网的快速发展,客户 ...
- zookpeer实现对服务器动态上下线的监听
服务器动态上下线程序的工作机制 服务器代码: 补充:volatile关键字:java中一切都是对象,当多个线程操作同一个对象时候,该对象会放在堆内存中,而多个线程相当于在多个栈中,当A线程想要去除对象 ...
- 【API接口】接口上线下线 用户在线测试,和管理员发布api待完善...
接口上线下线 上线: //判空 //此接口是否存在 //是否为管理员 //设置数据库status -0:下线 -1:上线 下线: 同上线 用户在线测试 //判空 //是否申请API签名秘钥 -免费试用 ...
最新文章
- 手动添加Cookie
- Serverless 时代前端避坑指南
- 从0开始学习 GitHub 系列之「03.Git 速成」
- 批量关闭公众号推送_微信内测新功能:可批量关闭订阅号推送
- cvpr 注意力机制_视频人员重识别:关系引导空间注意力 + 时间特征提取模型
- Swift 5新特性详解:ABI 稳定终于来了!
- matlab求两向量夹角_高等数学之向量代数与空间解析几何知识点与题型总结
- 华工计算机学院专硕分数线,2017华南理工大学
- python正弦波叠加方波_电赛初探(一)——正弦波、方波、锯齿波转换
- 浅析大数据给我们带来的便利和好处
- 解决谷歌浏览器Chrome不能播放央视新闻视频的问题
- webpack合成sprite图
- win7安全模式如何打开计算机管理,Win7安全模式怎么进?Win7进入安全模式方法
- 员工离职率预测,练手赛
- 深入理解Python中的if语句
- 解决谷歌浏览器跨域以及cookie保存失效重复登录
- 黑客们的往事(连载十) 凯文·米特尼克
- composer 指定PHP版本
- 排名靠前的5个编程论坛
- vue3利用JS切换背景图