EMQ X - EmqxBroker Window10环境安装部署以及发布订阅测试
在EMQ官网上拿张图哈^_^;;
本来就是在做物联网项目嘛,MQTT协议肯定是必须要的嘛,但之前不是我来负责这一块的,就没有对MQTT以及EMQ有更多的理解,只是会用能用罢了,要是让我说个1 2 3 ,肯定是不行的呀,最近有一个项目刚好我来对接开发,而且是MQTT协议的,由于受测试环境的限制,只能在本地笔记本上window上搭建一套EMQ环境来试试啦;
一、EMQX安装;
1、下载EMQ X Broker压缩包;这里放了官网最新版的https://www.emqx.io/downloads/broker/v4.1.0/emqx-windows-v4.1.0.zip;下面文章中的使用的是 “emqx-windows10-v3.2.0.zip”;
2、在笔记本本地目录直接解压就行啦;
3、进入解压后的目录;C:\OldData_Win7\emqx-windows10-v3.2.0\emqx\bin;
4、进入EMQX启动命令目录;按下shift键+鼠标右键,选择 ‘open PowerShell window here’;
5、启动EMQX服务;输入./emqx console;会弹出erlang的后台界面;
输入./emqx stop;即可停止服务啦;
输入./emqx start;即可轻松启动服务啦;(推荐)
6、启动成功后,EMQX自带的dashboard既可以访问啦;http://localhost:18083/#/login 默认u/p:admin/public
7、到这里我们的EMQ X Broker就已经可以使用了;
二、java连接EMQX,并发起订阅发布的操作;
1、因为客户端和服务端是都可以发布和订阅的;所以我们就以发布和订阅来区分吧;
订阅端sub:(启动main方法就可以连接到我们本地的EMQX了,哦,我这里使用了共享订阅的,random策略)
package com.daopin.project.mqtt;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class SubMsg0 {private static final Logger logger = LoggerFactory.getLogger(SubMsg0.class);//private static String topic = "$share/group/test1";//private static String topic = "$queue/test1";//private static String topic = "test1";private static int qos = 0;private static String broker = "tcp://127.0.0.1:1883";private static String userName = "COAP";private static String passWord = "coap";private static String clientId = "nokia-mqtt-cluster-0";/*** 有三种消息发布服务质量:* <p>* “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。* <p>* “至少一次”,确保消息到达,但消息重复可能会发生。* <p>* “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。*/private static MqttClient connect(String clientId) throws MqttException {MemoryPersistence persistence = new MemoryPersistence();MqttConnectOptions connOpts = new MqttConnectOptions();//String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.206:1883"};// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 这里设置为true表示每次连接到服务器都以新的身份连接(客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息)// 这里设置为false表示客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效connOpts.setCleanSession(true);connOpts.setUserName(userName);connOpts.setPassword(passWord.toCharArray());connOpts.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制connOpts.setKeepAliveInterval(20);connOpts.setAutomaticReconnect(true);connOpts.setMaxInflight(10);//connOpts.setServerURIs(uris);//setWill方法(遗嘱),如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息//connOpts.setWill(topic, "close".getBytes(), 2, true);// MemoryPersistence设置clientid的保存形式,默认为以内存保存MqttClient mqttClient = new MqttClient(broker, clientId, persistence);mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectionLost(Throwable throwable) {//在断开连接时调用 连接丢失后,一般在这里面进行重连logger.warn("connectionLost ... ; We will do something ...");}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) {//接收已经预订的发布logger.info("topic - > " + topic + ", mqttMessage - > " + mqttMessage);}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {//接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用logger.warn("deliveryComplete ... ; We will do something ...");}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*subscribe();*/ //连接成功,需要上传客户端所有的订阅关系logger.warn("connectComplete ... ; We will do something ...");}});mqttClient.connect(connOpts);return mqttClient;}public static void sub(MqttClient mqttClient, String topic) throws MqttException {int[] Qos = {qos};String[] topics = {topic};mqttClient.subscribe(topics, Qos);logger.info("sub >> " + topic);}private static void runsub(String clientId, String topic) throws MqttException {MqttClient mqttClient = connect(clientId);if (mqttClient != null) {sub(mqttClient, topic);}}public static void main(String[] args) throws MqttException {runsub(clientId, "$queue/qdq02mzl6kvs/coap-server/uplinkMsg");}
}
2、发布端pub;
package com.daopin.project.mqtt;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Date;
import java.util.concurrent.*;public class PubMsg {private static final Logger logger = LoggerFactory.getLogger(PubMsg.class);private static int qos = 0;private static String broker = "tcp://127.0.0.1:1883";private static String userName = "COAP";private static String passWord = "coap";public static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();public static ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());private static MqttClient connect(String clientId, String userName,String password) throws MqttException {MemoryPersistence persistence = new MemoryPersistence();MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());connOpts.setConnectionTimeout(10);connOpts.setKeepAliveInterval(20);//String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};//connOpts.setServerURIs(uris); //起到负载均衡和高可用的作用MqttClient mqttClient = new MqttClient(broker, clientId, persistence);mqttClient.setCallback(new PushCallback("test"));mqttClient.connect(connOpts);return mqttClient;}private static void pub(MqttClient sampleClient, String msg, String topic)throws Exception {while (true){MqttMessage message = new MqttMessage((msg+" "+new Date()).getBytes());message.setQos(qos);message.setRetained(false);sampleClient.publish(topic, message);logger.info("pub-->" + message);Thread.sleep(3000L);}}private static void publish(String str, String clientId, String topic) {try {MqttClient mqttClient = connect(clientId, userName, passWord);if (mqttClient != null) {pub(mqttClient, str, topic);}if (mqttClient != null) {mqttClient.disconnect();}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws MqttException {//singleThreadPool.execute( ()-> publish("message content", "868333030030008", "sl6o3lbk94xg/868333030030008/uplinkMsg/0/data"));publish("AAAA0000", "nokia-mqtt-server", "qdq02mzl6kvs/coap-server/uplinkMsg");}
}class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);private String threadId;public PushCallback(String threadId) {this.threadId = threadId;}@Overridepublic void connectionLost(Throwable cause) {//在断开连接时调用 连接丢失后,一般在这里面进行重连logger.warn("connectionLost ... ; We will do something ...");}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {//System.out.println("deliveryComplete---------" + token.isComplete());//接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用logger.warn("deliveryComplete ... ; We will do something ...");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String msg = new String(message.getPayload());System.out.println(threadId + " " + msg);}
}
我们的订阅端也已经收到这条pub的数据了。。。
3、我们也可以在 dashboard来观察到这两个会话;
三、总结;
1、共享订阅
包含一个主题过滤器和订阅选项,唯一的区别在于共享订阅的主题过滤器格式必须是 $share/{ShareName}/{filter} 这种形式。这几个的字段的含义分别是:
$share 前缀表明这将是一个共享订阅
{ShareName} 是一个不包含 “/”, “+” 以及 “#” 的字符串。订阅会话通过使用相同的 {ShareName} 表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个会话
{filter} 即非共享订阅中的主题过滤器
共享订阅使得订阅端能够负载均衡地消费消息,但 MQTT 协议并没有规定 Server 应当使用什么负载均衡策略。作为参考,EMQ X 提供了 random, round_robin, sticky, hash 四种策略供用户自行选择。
2、connetcLost方法
我们最好要在这里做一些事情,抛出异常或者打印日志,这样你才能知道到底什么时候连接丢失了的
3、 消息发布服务质量
如何选择QoS:
QoS 级别越高,流程越复杂,系统资源消耗越大。应用程序可以根据自己的网络场景和业务需求,选择合适的 QoS 级别,比如在同一个子网内部的服务间的消息交互往往选用 QoS 0;而通过互联网的实时消息通信往往选用 QoS 1;QoS 2 使用的场景相对少一些,适合一些支付请求之类的要求较高的场景。
4、CleanSession参数;若是共享订阅模式下,需要将此字段配置为true,保证在订阅端有一个掉线的情况下,可以清除掉session信息,这样就不会收到订阅的pub信息了,避免了数据丢失的问题。
EMQ X - EmqxBroker Window10环境安装部署以及发布订阅测试相关推荐
- HoloLens开发环境安装部署(基于Unity2018.4/VS2017/MRTK)
HoloLens是微软的MR开发环境,网上有很多部署教程,我这里推一个博主的部署方法,大体上都差不多.用微软的官方平台把UWP程序部署到HoloLens眼镜里. HoloLens开发环境安装部署 一. ...
- java做flv直播服务器,EasyDSS流媒体服务器软件(支持RTMP/HLS/HTTP-FLV/视频点播/视频直播)-正式环境安装部署攻略...
EasyDSS流媒体服务器软件,提供一站式的转码.点播.直播.时移回放服务,极大地简化了开发和集成的工作. 其中,点播功能主要包含:上传.转码.分发.直播功能,主要包含:直播.录像, 直播支持RTMP ...
- Dynamic CRM9.0 环境安装部署手册步骤和遇到的一些问题解决方案(包含ADFS部署)
Dynamic CRM9.0 环境安装部署手册 Dynamic 365和ADFS配置安装过程踩了一些坑,拿出来和大家记录分享一下. 目录 Dynamic CRM9.0 环境安装部署手册 一.Activ ...
- Nexus环境安装部署及使用
目录 一.Nexus介绍 二.2.X环境安装部署 1.Nexus官网下载地址 2.解压到本地 3.修改Nexus端口(默认为8081) 4.开放防火墙 5.修改启动文件 6.启动 7.访问nexus ...
- 知识图谱实战开发案例剖析(22)Protege开发环境安装部署
一.前言 本文是系列博文<知识图谱实战开发案例剖析>第7部分:Protégé本体建模,第1节:Protégé环境安装部署.该系列内容已经录制成视频课程,发布在:网易云课堂. 二.正文 2. ...
- Kubernetes 生产环境安装部署 基于 Kubernetes v1.14.0 之 etcd集群
说明:没有明确注明在某台服务器,都是在k8s-operation 工作服务器完成 K8S node 节点数大于2000 节点 k8s-operation 目录规划,工作目录/apps/work/k8s ...
- linux环境安装部署mark
2019独角兽企业重金招聘Python工程师标准>>> 以下linux环境的安装部署,其中碰到不少坑,这里MARK下. linux运营一个网站,需要安装的东西基本如下: ====== ...
- PHP7.2与apache环境安装部署详细流程
php-agent 安装部署详细流程 一.环境 测试机内网IP: 10.128.5.98 用户名口令: root/oneapm21 cat /etc/system-release cat /etc/i ...
- weblogic测试环境安装部署--傻瓜式安装教程
测试环境weblogic部署手册 1.weblogic需要有jdk环境 1.1 通过xftp工具把jdk1.8的软件包传入到服务器的/usr/local中并解压 cd /usr/local tar - ...
最新文章
- Udacity机器人软件工程师课程笔记(三十三) - 蒙特卡洛定位算法(MCL)
- poj 3411(DFS多点访问)
- 用高精度方法计算n! ,并显示n!(阶乘)的值。
- java培训第一阶段测试总结,达内学员Java培训阶段总结:反躬自省,愈渐完美
- Ansible批量在远程主机执行命令
- 深入理解Java引用类型
- php函数嵌套 作用域,javascript 嵌套的函数(作用域链)_javascript技巧
- 解决:Throwable:Stub index points to a file without PSI: com.intellij.openapi.fileTypes.UnknownFileType
- java 类 加载 初始化_java类的加载与初始化
- mfix中统计气泡体积
- pandas numpy处理缺失值,none与nan比较
- CentOS+postfix+ExtMail+amavisd-new+Spam_Locker+DSpam配置指南:七、配置Webmail-ExtMail
- scala 高级十六 scala 集合和集合的高级特性 map flatten fllatmap zip reduce zip 等操作...
- “朝三暮四”与“BPO”
- CSDN首页 云计算 孙玄:解析58同城典型技术架构及演变
- 网络邻居上的计算机没权限,WinXP打开网上邻居提示“您可能没有权限使用网络资源”怎么办?...
- c# hook技术的实现
- git版本回退命令_git 版本回退 撤销 删除
- Hbase深入学习(五) 命令及查看状态
- Redis学习笔记(实战篇)(自用)