在EMQ官网上拿张图哈^_^;;

本来就是在做物联网项目嘛,MQTT协议肯定是必须要的嘛,但之前不是我来负责这一块的,就没有对MQTT以及EMQ有更多的理解,只是会用能用罢了,要是让我说个1 2 3 ,肯定是不行的呀,最近有一个项目刚好我来对接开发,而且是MQTT协议的,由于受测试环境的限制,只能在本地笔记本上window上搭建一套EMQ环境来试试啦;

一、EMQX安装;

1、下载EMQ X Broker压缩包;这里放了官网最新版的https://www.emqx.io/downloads/broker/v4.1.0/emqx-windows-v4.1.0.zip;下面文章中的使用的是 “emqx-windows10-v3.2.0.zip”;

2、在笔记本本地目录直接解压就行啦;

3、进入解压后的目录;C:\OldData_Win7\emqx-windows10-v3.2.0\emqx\bin;

4、进入EMQX启动命令目录;按下shift键+鼠标右键,选择 ‘open PowerShell window here’;

5、启动EMQX服务;输入./emqx console;会弹出erlang的后台界面;

输入./emqx stop;即可停止服务啦;

输入./emqx start;即可轻松启动服务啦;(推荐)

6、启动成功后,EMQX自带的dashboard既可以访问啦;http://localhost:18083/#/login 默认u/p:admin/public

     7、到这里我们的EMQ X Broker就已经可以使用了;

二、java连接EMQX,并发起订阅发布的操作;

1、因为客户端和服务端是都可以发布和订阅的;所以我们就以发布和订阅来区分吧;

订阅端sub:(启动main方法就可以连接到我们本地的EMQX了,哦,我这里使用了共享订阅的,random策略)

package com.daopin.project.mqtt;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class SubMsg0 {private static final Logger logger = LoggerFactory.getLogger(SubMsg0.class);//private static String topic = "$share/group/test1";//private static String topic = "$queue/test1";//private static String topic = "test1";private static int qos = 0;private static String broker = "tcp://127.0.0.1:1883";private static String userName = "COAP";private static String passWord = "coap";private static String clientId = "nokia-mqtt-cluster-0";/*** 有三种消息发布服务质量:* <p>*   “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。* <p>*   “至少一次”,确保消息到达,但消息重复可能会发生。* <p>*   “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。*/private static MqttClient connect(String clientId) throws MqttException {MemoryPersistence persistence = new MemoryPersistence();MqttConnectOptions connOpts = new MqttConnectOptions();//String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.206:1883"};// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 这里设置为true表示每次连接到服务器都以新的身份连接(客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息)// 这里设置为false表示客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效connOpts.setCleanSession(true);connOpts.setUserName(userName);connOpts.setPassword(passWord.toCharArray());connOpts.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制connOpts.setKeepAliveInterval(20);connOpts.setAutomaticReconnect(true);connOpts.setMaxInflight(10);//connOpts.setServerURIs(uris);//setWill方法(遗嘱),如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息//connOpts.setWill(topic, "close".getBytes(), 2, true);// MemoryPersistence设置clientid的保存形式,默认为以内存保存MqttClient mqttClient = new MqttClient(broker, clientId, persistence);mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectionLost(Throwable throwable) {//在断开连接时调用 连接丢失后,一般在这里面进行重连logger.warn("connectionLost ... ; We will do something ...");}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) {//接收已经预订的发布logger.info("topic - > " + topic + ", mqttMessage - > " + mqttMessage);}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {//接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用logger.warn("deliveryComplete ... ; We will do something ...");}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*subscribe();*/ //连接成功,需要上传客户端所有的订阅关系logger.warn("connectComplete ... ; We will do something ...");}});mqttClient.connect(connOpts);return mqttClient;}public static void sub(MqttClient mqttClient, String topic) throws MqttException {int[] Qos = {qos};String[] topics = {topic};mqttClient.subscribe(topics, Qos);logger.info("sub >> " + topic);}private static void runsub(String clientId, String topic) throws MqttException {MqttClient mqttClient = connect(clientId);if (mqttClient != null) {sub(mqttClient, topic);}}public static void main(String[] args) throws MqttException {runsub(clientId, "$queue/qdq02mzl6kvs/coap-server/uplinkMsg");}
}

2、发布端pub;

package com.daopin.project.mqtt;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Date;
import java.util.concurrent.*;public class PubMsg {private static final Logger logger = LoggerFactory.getLogger(PubMsg.class);private static int qos = 0;private static String broker = "tcp://127.0.0.1:1883";private static String userName = "COAP";private static String passWord = "coap";public static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();public static ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());private static MqttClient connect(String clientId, String userName,String password) throws MqttException {MemoryPersistence persistence = new MemoryPersistence();MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());connOpts.setConnectionTimeout(10);connOpts.setKeepAliveInterval(20);//String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};//connOpts.setServerURIs(uris);  //起到负载均衡和高可用的作用MqttClient mqttClient = new MqttClient(broker, clientId, persistence);mqttClient.setCallback(new PushCallback("test"));mqttClient.connect(connOpts);return mqttClient;}private static void pub(MqttClient sampleClient, String msg, String topic)throws Exception {while (true){MqttMessage message = new MqttMessage((msg+" "+new Date()).getBytes());message.setQos(qos);message.setRetained(false);sampleClient.publish(topic, message);logger.info("pub-->" + message);Thread.sleep(3000L);}}private static void publish(String str, String clientId, String topic)  {try {MqttClient mqttClient = connect(clientId, userName, passWord);if (mqttClient != null) {pub(mqttClient, str, topic);}if (mqttClient != null) {mqttClient.disconnect();}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws MqttException {//singleThreadPool.execute( ()-> publish("message content", "868333030030008", "sl6o3lbk94xg/868333030030008/uplinkMsg/0/data"));publish("AAAA0000", "nokia-mqtt-server", "qdq02mzl6kvs/coap-server/uplinkMsg");}
}class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);private String threadId;public PushCallback(String threadId) {this.threadId = threadId;}@Overridepublic void connectionLost(Throwable cause) {//在断开连接时调用 连接丢失后,一般在这里面进行重连logger.warn("connectionLost ... ; We will do something ...");}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {//System.out.println("deliveryComplete---------" + token.isComplete());//接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用logger.warn("deliveryComplete ... ; We will do something ...");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String msg = new String(message.getPayload());System.out.println(threadId + " " + msg);}
}

我们的订阅端也已经收到这条pub的数据了。。。

3、我们也可以在 dashboard来观察到这两个会话;

三、总结;

1、共享订阅

包含一个主题过滤器和订阅选项,唯一的区别在于共享订阅的主题过滤器格式必须是 $share/{ShareName}/{filter} 这种形式。这几个的字段的含义分别是:

$share 前缀表明这将是一个共享订阅
              {ShareName} 是一个不包含 “/”, “+” 以及 “#” 的字符串。订阅会话通过使用相同的 {ShareName} 表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个会话
               {filter} 即非共享订阅中的主题过滤器

共享订阅使得订阅端能够负载均衡地消费消息,但 MQTT 协议并没有规定 Server 应当使用什么负载均衡策略。作为参考,EMQ X 提供了 random, round_robin, sticky, hash 四种策略供用户自行选择。

2、connetcLost方法

我们最好要在这里做一些事情,抛出异常或者打印日志,这样你才能知道到底什么时候连接丢失了的

3、 消息发布服务质量

如何选择QoS:
                 QoS 级别越高,流程越复杂,系统资源消耗越大。应用程序可以根据自己的网络场景和业务需求,选择合适的 QoS 级别,比如在同一个子网内部的服务间的消息交互往往选用 QoS 0;而通过互联网的实时消息通信往往选用 QoS 1;QoS 2 使用的场景相对少一些,适合一些支付请求之类的要求较高的场景。

4、CleanSession参数;若是共享订阅模式下,需要将此字段配置为true,保证在订阅端有一个掉线的情况下,可以清除掉session信息,这样就不会收到订阅的pub信息了,避免了数据丢失的问题。

EMQ X - EmqxBroker Window10环境安装部署以及发布订阅测试相关推荐

  1. HoloLens开发环境安装部署(基于Unity2018.4/VS2017/MRTK)

    HoloLens是微软的MR开发环境,网上有很多部署教程,我这里推一个博主的部署方法,大体上都差不多.用微软的官方平台把UWP程序部署到HoloLens眼镜里. HoloLens开发环境安装部署 一. ...

  2. java做flv直播服务器,EasyDSS流媒体服务器软件(支持RTMP/HLS/HTTP-FLV/视频点播/视频直播)-正式环境安装部署攻略...

    EasyDSS流媒体服务器软件,提供一站式的转码.点播.直播.时移回放服务,极大地简化了开发和集成的工作. 其中,点播功能主要包含:上传.转码.分发.直播功能,主要包含:直播.录像, 直播支持RTMP ...

  3. Dynamic CRM9.0 环境安装部署手册步骤和遇到的一些问题解决方案(包含ADFS部署)

    Dynamic CRM9.0 环境安装部署手册 Dynamic 365和ADFS配置安装过程踩了一些坑,拿出来和大家记录分享一下. 目录 Dynamic CRM9.0 环境安装部署手册 一.Activ ...

  4. Nexus环境安装部署及使用

    目录 一.Nexus介绍 二.2.X环境安装部署 1.Nexus官网下载地址 2.解压到本地 3.修改Nexus端口(默认为8081) 4.开放防火墙 5.修改启动文件 6.启动 7.访问nexus ...

  5. 知识图谱实战开发案例剖析(22)Protege开发环境安装部署

    一.前言 本文是系列博文<知识图谱实战开发案例剖析>第7部分:Protégé本体建模,第1节:Protégé环境安装部署.该系列内容已经录制成视频课程,发布在:网易云课堂. 二.正文 2. ...

  6. Kubernetes 生产环境安装部署 基于 Kubernetes v1.14.0 之 etcd集群

    说明:没有明确注明在某台服务器,都是在k8s-operation 工作服务器完成 K8S node 节点数大于2000 节点 k8s-operation 目录规划,工作目录/apps/work/k8s ...

  7. linux环境安装部署mark

    2019独角兽企业重金招聘Python工程师标准>>> 以下linux环境的安装部署,其中碰到不少坑,这里MARK下. linux运营一个网站,需要安装的东西基本如下: ====== ...

  8. PHP7.2与apache环境安装部署详细流程

    php-agent 安装部署详细流程 一.环境 测试机内网IP: 10.128.5.98 用户名口令: root/oneapm21 cat /etc/system-release cat /etc/i ...

  9. weblogic测试环境安装部署--傻瓜式安装教程

    测试环境weblogic部署手册 1.weblogic需要有jdk环境 1.1 通过xftp工具把jdk1.8的软件包传入到服务器的/usr/local中并解压 cd /usr/local tar - ...

最新文章

  1. Udacity机器人软件工程师课程笔记(三十三) - 蒙特卡洛定位算法(MCL)
  2. poj 3411(DFS多点访问)
  3. 用高精度方法计算n! ,并显示n!(阶乘)的值。
  4. java培训第一阶段测试总结,达内学员Java培训阶段总结:反躬自省,愈渐完美
  5. Ansible批量在远程主机执行命令
  6. 深入理解Java引用类型
  7. php函数嵌套 作用域,javascript 嵌套的函数(作用域链)_javascript技巧
  8. 解决:Throwable:Stub index points to a file without PSI: com.intellij.openapi.fileTypes.UnknownFileType
  9. java 类 加载 初始化_java类的加载与初始化
  10. mfix中统计气泡体积
  11. pandas numpy处理缺失值,none与nan比较
  12. CentOS+postfix+ExtMail+amavisd-new+Spam_Locker+DSpam配置指南:七、配置Webmail-ExtMail
  13. scala 高级十六 scala 集合和集合的高级特性 map flatten fllatmap zip reduce zip 等操作...
  14. “朝三暮四”与“BPO”
  15. CSDN首页 云计算 孙玄:解析58同城典型技术架构及演变
  16. 网络邻居上的计算机没权限,WinXP打开网上邻居提示“您可能没有权限使用网络资源”怎么办?...
  17. c# hook技术的实现
  18. git版本回退命令_git 版本回退 撤销 删除
  19. Hbase深入学习(五) 命令及查看状态
  20. Redis学习笔记(实战篇)(自用)

热门文章

  1. Shellshock
  2. 施努卡:机器视觉系统作用是什么,原理是什么
  3. zabbix二次开发,帮你快速上手
  4. 没钱的草根站长可以这样推广网站
  5. 写给1987—1990年出生的同学,生活在80后和90后夹缝中的一代
  6. 怎么提高计算机的桌面性能,Win7系统十大优化技巧让你的电脑一快再快
  7. java计算机毕业设计基于springboo+vue的汉服文化宣传活动交流网站(汉服社团)
  8. matlab入门命令分类集合——适合matlab初学者记忆整理
  9. 国内会议论文查重吗?
  10. 智能手势体感机械臂(基础一)