mqttClient是一个储存在内存中的对象,一旦服务器重启或者项目发版,必须让刚刚正在工作的消费方重新启动起来
同时我们需要将这些对象储存起来,方便我们手动关闭监听

一个仓库+一个线程哨兵

上代码

package com.datacvg.config;import com.datacvg.dao.ExchangeMqWithDatabaseMapper;
import com.datacvg.mqtt.DatasourceCrossClient;
import com.datacvg.vo.ExchangeMqPushDatabaseVo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;/*** @Author: 商朝* @Date: 2021.4.15* @Function: 正在运行客户端的仓库+重启服务器扫描正在运行的服务重启并入库*/@Component
public class RestartMqttClientConfig {private final HashMap<String,DatasourceCrossClient> runningClient =new HashMap<>();@AutowiredExchangeMqWithDatabaseMapper exchangeMqWithDatabaseMapper;@AutowiredRestartMqttClientConfig checkStock;@PostConstructpublic void restartListen(){try {//查出正在运行的服务List<ExchangeMqPushDatabaseVo> listClient = exchangeMqWithDatabaseMapper.queryAllRunningClient();//用于储存已经开服务的客户端和datasourceId的对应关系Map<String, DatasourceCrossClient> tempDatasourceCrossClient = new HashMap<>();//存放clientId正在被占用的客户端(假如有两个clientId重复 会停掉前者)-即将要停掉的服务List<String> listWillBeStopedMq = new LinkedList<>();//哨兵站岗new Thread(new CheckStock(checkStock,exchangeMqWithDatabaseMapper)).start();if (CollectionUtils.isEmpty(listClient)) return;for (ExchangeMqPushDatabaseVo mqPushVO : listClient) {DatasourceCrossClient crossClient = new DatasourceCrossClient();//启动服务crossClient.crossSensationLinkListenAndInsert(mqPushVO.getMqUsername(), mqPushVO.getMqPassword(), mqPushVO.getMqUrl(), mqPushVO.getMqTopicname(), mqPushVO.getMqClientId(), mqPushVO.getTableName());//添加对应关系tempDatasourceCrossClient.put(mqPushVO.getDatasourceId(), crossClient);}if (tempDatasourceCrossClient.size() != 0) {Set<Map.Entry<String, DatasourceCrossClient>> entries = tempDatasourceCrossClient.entrySet();for (Map.Entry<String, DatasourceCrossClient> entry : entries) {if (!entry.getValue().getClientIdCouldBeUsed()) {listWillBeStopedMq.add(entry.getKey());} else {//入库setRunningClient(entry.getKey(), entry.getValue());}}}if (listWillBeStopedMq.size() != 0) {//改表exchangeMqWithDatabaseMapper.updateToStop(listWillBeStopedMq);}}catch (Exception e){e.printStackTrace();}}public HashMap<String,DatasourceCrossClient> getRunningClient(){return this.runningClient;}public void setRunningClient(String sourceObject,DatasourceCrossClient crossClient){this.runningClient.put(sourceObject,crossClient);}public void removeRunningClient(List<String> sourceObject){for (String s : sourceObject) {this.runningClient.remove(s);}}}
/**
*
*每隔三秒检索一次
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class CheckStock implements Runnable{RestartMqttClientConfig stock;ExchangeMqWithDatabaseMapper exchangeMqWithDatabaseMapper;public void run() {while (true){try {Thread.sleep(3000);LinkedList<String> list =new LinkedList<>();if(stock.getRunningClient().isEmpty()){continue;}Set<Map.Entry<String, DatasourceCrossClient>> entries = stock.getRunningClient().entrySet();List<String> willBeStoped = exchangeMqWithDatabaseMapper.queryAllStoppedClient();if(CollectionUtils.isEmpty(willBeStoped)){continue;}for (String s : willBeStoped) {if(stock.getRunningClient().containsKey(s)){DatasourceCrossClient datasourceCrossClient = stock.getRunningClient().get(s);try {datasourceCrossClient.getMqttClient().disconnect();datasourceCrossClient.getMqttClient().close();}catch (Exception e){e.printStackTrace();}}}if(entries.size()!=0){for (Map.Entry<String, DatasourceCrossClient> entry : entries) {if(!entry.getValue().getClientIdCouldBeUsed()){list.add(entry.getKey());}}}if(list.size()!=0){stock.removeRunningClient(list);exchangeMqWithDatabaseMapper.updateToStop(list);}} catch (InterruptedException e) {e.printStackTrace();}}}
}

Mqtt服务自动重启,以及防止clientId重复被顶相关推荐

  1. android服务自动重启,安卓service关闭后怎么自动重启

    满意答案 首先申明service关闭有两种情况: 1.程序进入后台,系统可能会销毁应用,可以理解为android端监听推送消息的服务在启动后是一直在后台运行的,但是当内存不足时,或者第三方应用清理内存 ...

  2. swoft实现自动重启服务

    目的: 1.上传代码后HTTP服务自动重启,不需要自己手动执行:php bin/swoft http:start 2.自动重启适用于开发调试阶段,因为不能再后台运行所以在线上环境的话还是要重启http ...

  3. swoft2.x swoftCli 自动重启服务

    目的: 上传代码后HTTP服务自动重启,不需要自己手动执行:php bin/swoft http:start 自动重启适用于开发调试阶段,因为不能再后台运行所以在线上环境的话还是要重启http服务 下 ...

  4. vm设置虚拟服务器定时重启,vm服务器设置自动重启

    vm服务器设置自动重启 内容精选 换一换 云硬盘挂载至云服务器后,需要登录云服务器初始化云硬盘,即格式化云硬盘,之后云硬盘才可以正常使用.云耀云服务器磁盘初始化的操作方法与ECS相同,本节操作介绍使用 ...

  5. 服务器系统具备自检能力,服务器内存自检自动重启

    服务器内存自检自动重启 内容精选 换一换 会.弹性云服务器运行在物理机上,虽然提供了多种机制来保证系统的可靠性.容错能力和高可用性,但是,服务器的硬件.电源等部件仍有较小概率的损坏.如果物理设备的损坏 ...

  6. Node.js笔记 - 修改文件后自动重启node服务

    每次修改代码后都要手动重启node服务,虽然不是复杂的事,但是每次都要这么搞一次颇为麻烦. 所以nodemon登场了! nodemon会监测你已运行程序中的文件和目录,一旦被修改,它会自动重启node ...

  7. Android的服务(Service)(二)Service的自动重启问题

    继续上篇的分析,接下来是第二个问题"Service的自动重启问题" (一).Service的生命周期 (二).Service的自动重启问题 这里要说服务的自动重启问题,这个问题其实 ...

  8. Linux之systemd服务配置及自动重启

    Linux之systemd服务配置及自动重启 0 背景 在linux上开发时,往往需要将自己的程序做成服务,并且实现服务开机自动重启,以及服务崩溃后自动重启功能,本文就对该功能的实现做简单介绍,实现方 ...

  9. docker certbot 一键申请https证书、证书过期续订、续订成功自动重启服务

    前言 基于**certbot-letencrypt-wildcardcertificates-alydns-au**项目封装为docker镜像, 感谢 ywdblog 的源码 镜像支持一键生成证书.证 ...

最新文章

  1. gatsby_将您的GraphCMS数据导入Gatsby
  2. Java中关于进程和线程的理解
  3. python使用手册-Python参考手册(第4版)
  4. 浅谈三个星期零基础入门学习Thinkphp5开发restful-api接口的心得和总结
  5. C++STL容器,你真的会用了吗?——插入、删除、遍历和查找操作性能对比——删除(精简易懂版,句句干货)
  6. HtmlAgilityPack的简单使用
  7. 深入研究Java中一个对象的初始化过程
  8. VS2015+NUnit+OpenCover 完成单元测试代码覆盖率测试
  9. SpringBoot连接Redis服务出现DENIED Redis is running in protected mode because protected mode is enabled
  10. diskgenius创建efi分区_DISKGEN 专业版修改硬盘为GPT分区 ESP分区图文教程
  11. java版简易计算器,java 简易计算器
  12. 速卖通关键词挖掘工具_2020网站关键词挖掘工具有哪些
  13. 蓝电电池测试系统工步编辑软件,CT2001A
  14. Typora-Markdown编辑器语法
  15. 数学建模技巧总结(一)
  16. 【原创】博物馆库房环境空气质量无线温湿度监控技术性方案
  17. 深信服网络挑战赛初赛_2019
  18. 2020科大讯飞iFLYTEK A.I.开发者大赛
  19. TC118S/TC118H单通道直流马达驱动IC
  20. Brackets sequence UVA - 1626(区间DP)

热门文章

  1. Camera sensor bring up
  2. 新版白话空间统计(13):随机的力量
  3. 旅游景区智能分析-需求文档
  4. 智能自适应边缘系统:探索与挑战
  5. 机器视觉检测技术在螺丝螺母缺陷检测中的应用
  6. 四、BIRT数据集和参数的建立
  7. Allot流量控制系统软件升级过程
  8. 水利类专业怎么应用计算机,水利水电工程专业系列教材·计算机应用技术
  9. CF667DIV3-F:dp
  10. EPLAN报表设备元件添加,清单生成