简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。

MQTT原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

(1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。

(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。

(3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

MQTT在Java中应用

MQTT的应用一般需要MQTT服务器,比如mosquito或EMQX服务软件,MQTT客户端和代理服务器可以使用代码实现。

Java客户端开发

<dependency>           <groupId>org.eclipse.paho</groupId>  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>            <version>1.2.0</version>
</dependency>

MQTTClient.java文件
单线程:

package com.t4cloud.t.sensor.handler;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class DemoMqttClient {public static void main(String... args) {try {// host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,// MemoryPersistence设置clientid的保存形式,默认为以内存保存MqttClient mqttClient = new MqttClient("tcp://47.98.137.173:10001", "client", new MemoryPersistence());// 配置参数信息MqttConnectOptions options = new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 这里设置为true表示每次连接到服务器都以新的身份连接options.setCleanSession(true);// 设置用户名options.setUserName("mu88KCb9");// 设置密码options.setPassword("J0x24wd066Y3q778".toCharArray());// 设置超时时间 单位为秒options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制options.setKeepAliveInterval(20);// 连接mqttClient.connect(options);// 订阅mqttClient.subscribe("test");// 设置回调mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {// 连接失败时调用  重新连接订阅System.out.println("连接丢失.............");try {System.out.println("开始重连");Thread.sleep(3000);mqttClient.connect(options);} catch (InterruptedException e) {e.printStackTrace();} catch (MqttSecurityException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {log.info("接收消息主题 : " + topic);log.info("接收消息Qos : " + mqttMessage.getQos());log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {//认证过程log.info("deliveryComplete.............");}});// 创建消息MqttMessage message = new MqttMessage("hello World!".getBytes());// 设置消息的服务质量message.setQos(0);// 发布消息mqttClient.publish("test", message);// 断开连接mqttClient.disconnect();// 关闭客户端mqttClient.close();} catch (Exception e) {e.printStackTrace();}}
}
如果你要在一个项目中启动多个客户端,那么可以多线程的方式创建

多线程:

package com.t4cloud.t.sensor.entity;import com.t4cloud.t.base.redis.topic.entity.RedisMsg;
import com.t4cloud.t.base.utils.RedisTopicUtil;
import com.t4cloud.t.sensor.constant.MqttClientManager;
import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;//MQTT客户端线程
@Slf4j
public class MqttClientThread extends Thread{//连接地址private String serverURL;//MQTT客户端登录用户名private String mqttUsername;//MQTT客户端密码private String mqttPassWord;//MQTT订阅主题private String mqttTopic;//MQTT的clientprivate String clientId;//产品idprivate String productId;//推送至我们自己的RedisTopIc中channelprivate String channel = "mqtt";//mqtt实体类private MqttClient mqttClient;//构造函数public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) {this.serverURL = serverURL;this.mqttUsername = mqttUsername;this.mqttPassWord = mqttPassWord;this.mqttTopic = mqttTopic;this.clientId = clientId;this.productId = productId;}//线程方法public void run(){try {// host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,// MemoryPersistence设置clientid的保存形式,默认为以内存保存,就用usernamemqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence());// 配置参数信息MqttConnectOptions options = new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 这里设置为true表示每次连接到服务器都以新的身份连接options.setCleanSession(true);// 设置用户名options.setUserName(mqttUsername);// 设置密码options.setPassword(mqttPassWord.toCharArray());// 设置超时时间 单位为秒options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
//            options.setKeepAliveInterval(20);//设置断开后重新连接options.setAutomaticReconnect(true);// 连接mqttClient.connect(options);// 订阅//如果监测到有,号,说明要订阅多个主题if(mqttTopic.contains(",")){//多主题String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//单主题mqttClient.subscribe(mqttTopic);}// 设置回调mqttClient.setCallback(new MqttCallbackExtended () {/*** Called when the connection to the server is completed successfully.** @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {try{//如果监测到有,号,说明要订阅多个主题if(mqttTopic.contains(",")){//多主题String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//单主题mqttClient.subscribe(mqttTopic);}log.info("----TAG", "connectComplete: 订阅主题成功");}catch(Exception e){e.printStackTrace();log.info("----TAG", "error: 订阅主题失败");}}@Overridepublic void connectionLost(Throwable throwable) {log.error("连接断开,下面做重连...");long reconnectTimes = 1;while (true) {try {if (mqttClient.isConnected()) {log.warn("mqtt reconnect success end");break;}if(reconnectTimes == 10){//当重连次数达到10次时,就抛出异常,不在重连log.warn("mqtt reconnect error");return;}log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);mqttClient.reconnect();} catch (MqttException e) {log.error("", e);}try {Thread.sleep(1000);} catch (InterruptedException e1) {
//                            e1.printStackTrace();}}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {log.info("接收消息主题 : " + topic);log.info("接收消息Qos : " + mqttMessage.getQos());log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));//向我们通道中发送消息RedisMsg redisMsg = new RedisMsg();redisMsg.setChannel(channel);redisMsg.setMsg("推送MQTT消息");SensorMqttMsg mqttMsg = new SensorMqttMsg();mqttMsg.setProductId(productId);mqttMsg.setPayload(new String(mqttMessage.getPayload()));redisMsg.setData(mqttMsg);RedisTopicUtil.sendMessage(channel, redisMsg);}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {//认证过程log.info("deliveryComplete.............");}});//放入缓存,根据clinetId吧mqttClient对象放进去MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient);} catch (Exception e) {e.printStackTrace();//当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){try {mqttClient.close();} catch (MqttException ex) {ex.printStackTrace();}}}}
}

这里面吧MqttClient放入了ConcurrentHashMap中,然后后面需要关闭的时候重ConcurrentHashMap拿出MqttClient对象使用如下代码关闭就可以了。

//断开连接
client.disconnect();
//关闭连接
client.close();

其次还解决了 MQTT断线重连及订阅消息恢复 问题。

java 实现MQTT客户端相关推荐

  1. 使用java开发MQTT客户端接收消息

    1.搭建好MQTT服务器,可以参考EMQX企业版试用笔记_Three Big Stones的博客-CSDN博客 2.Eclipse下新建一个Maven项目,并引入mqtt客户端java版本包. < ...

  2. java 整合MQTT客户端和Windows安装服务端1

    windows服务端安装 服务端下载地址 选择自己需要的版本下载即可,我用的是windows测试用的 解压完成之后 在bin目录下执行 emqx console,尽量不要用 emqx start因为报 ...

  3. eclipse paho java_如何使用Eclipse Paho在Java MQTT客户端上接收时发布消息

    我正在尝试使用 Eclipse Paho在Java中的MQTT客户端上实现某些功能.目标是订阅主题,并且当收到消息时,客户端发送关于另一主题的另一消息. 这看起来很容易,但我有一个奇怪的问题,我无法解 ...

  4. 使用java实现MQTT协议客户端的接收、发布消息和订阅、退订主题topic

    记录一下我实习的第一个任务,学习MQTT协议 首先呢得了解MQTT是什么,这里推荐一个我学习MQTT的中文文档 MQTT协议的基于TCP/IP协议的一个物联网协议,有几个概念必须要弄懂得主题(topi ...

  5. MQTT客户端库-Paho GO

    为了加深理解,本文是翻译文章.原文地址 Paho GO Client   语言 GO 协议 EPL AND EDL 官网地址 http://www.eclipse.org/paho/ API类型 As ...

  6. java版mosquitto客户端使用SSL功能的具体操作总结

    在开发java版mosquitto客户端程序时需要使用paho库,如果开发的java客户端要用ssl功能,则需要Bouncy Castle库:在使用ssl功能时,需要证书文件进行进行身份认证,但在测试 ...

  7. MQTT 客户端收发 MQTT 消息

    本文主要介绍如何使用 MQTT 客户端收发 MQTT 消息,并给出示例代码供前期开发测试参考,包括资源创建.环境准备.示例代码.注意事项等. 注意: 本文给出的实例均基于 Eclipse Paho J ...

  8. Android使用的MQTT客户端

    Android使用的MQTT客户端,支持订阅.发送消息: 支持创建连接到本地保存: 支持话题消息筛选: 使用视频:https://dwz.cn/undJFEnq 小米应用商店也有 [蘑菇IoT]~ 核 ...

  9. java连接MQTT服务器(Springboot整合MQTT)

    一.业务场景 硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收.解析.业务处理.存储入库.数据展示. MQTT 是基于 发布(Publish)/ ...

  10. 用emqx做mqtt客户端

    最近项目中有一个需求,要用mqtt协议接收路侧设备的数据到云平台上,所以,研究了一下mqtt客户端的制作方法. mqtt协议是一个发布订阅模式的协议. 这篇文章主要记录下我搭建mqttbroker和写 ...

最新文章

  1. 关于I2C协议和SPI协议学习之中的补充修改
  2. 实事求实来看综合布线网络
  3. 企业管理系统java web_JavaWeb 基于 web的 企业人事管理系统 Jsp/Servlet 242万源代码下载- www.pudn.com...
  4. kindle词典_kindle盖泡面是什么梗?kindle官方盖章泡面盖子 网友:定位准确
  5. AAAI 2021 | 幻灯片中文字的重要性预测赛亚军DeepBlueAI团队技术分享
  6. 2019-03-09-算法-进化(从排序数组中删除重复项)
  7. java 分贝_java11教程--jhsdb命令
  8. LeetCode 1120. 子树的最大平均值(DFS自底向上)
  9. java8 hadoop_java8-模拟hadoop
  10. 八个led闪烁c语言程序,闪烁的LED
  11. pandas25 if else语句(多数据df选择)( tcy)
  12. C语言洛谷P1957口算练习题
  13. 归档日志路径三个参数DB_RECOVERY_FILE_DEST和LOG_ARCHIVE_DEST和LOG_ARCHIVE_DEST_n
  14. 2019 计蒜之道 初赛 第一场 商汤的AI伴游小精灵
  15. AI智能视频分析平台EasyCVR视频直播弹框关闭后再次打开视频未能从起始时间播放的问题优化
  16. 华东师范大学 数据科学与工程 829 2021考研真题
  17. namp安装及官方使用手册翻译及注释1
  18. 五款轻量型bug管理工具横向测评
  19. 一文读懂通达信指标公式
  20. 云服务HCIE变题当天一把过!分享下学习备考和考试经验

热门文章

  1. CE教程:植物大战僵尸(金币数值修改)
  2. 三星k3梅林没有软件中心_斐讯K3梅林软件中心版刷机包 修正WAN口 修正2.4G 自动息屏 完美混血...
  3. 网络工程师——正则表达式(模糊匹配)
  4. 如何下载Chrome谷歌浏览器历史版本
  5. Unity自定义UI组件(八) 颜色拾取器(上)
  6. spring源码之下载及构建
  7. 如何汉化并编译 Python 源代码
  8. PCB SI9000阻抗计算引擎Web方式实现方法
  9. SI 9000 阻抗计算笔记
  10. python爬虫: 爬一个英语学习网站