Java-MQTT客户端监控连接状态事件
MQTT客户端监控连接状态事件
项目采用paho.mqtt.java客户端,需要监控连接状态变更事件,以进行异常维测和处理。
代码中提供了MqttCallback接口如下:
org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\MqttClient.java
public void setCallback(MqttCallback callback) {aClient.setCallback(callback);
}
其中MqttCallback 接口定义如下:
public interface MqttCallback {/** 连接断连* This method is called when the connection to the server is lost.** @param cause the reason behind the loss of connection.*/public void connectionLost(Throwable cause);/** 消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了* This method is called when a message arrives from the server.** <p>* This method is invoked synchronously by the MQTT client. An* acknowledgment is not sent back to the server until this* method returns cleanly.</p>* <p>* If an implementation of this method throws an <code>Exception</code>, then the* client will be shut down. When the client is next re-connected, any QoS* 1 or 2 messages will be redelivered by the server.</p>* <p>* Any additional messages which arrive while an* implementation of this method is running, will build up in memory, and* will then back up on the network.</p>* <p>* If an application needs to persist data, then it* should ensure the data is persisted prior to returning from this method, as* after returning from this method, the message is considered to have been* delivered, and will not be reproducible.</p>* <p>* It is possible to send a new message within an implementation of this callback* (for example, a response to this message), but the implementation must not* disconnect the client, as it will be impossible to send an acknowledgment for* the message being processed, and a deadlock will occur.</p>** @param topic name of the topic on the message was published to* @param message the actual message.* @throws Exception if a terminal error has occurred, and the client should be* shut down.*/public void messageArrived(String topic, MqttMessage message) throws Exception;/** 消息publish完成回调* Called when delivery for a message has been completed, and all* acknowledgments have been received. For QoS 0 messages it is* called once the message has been handed to the network for* delivery. For QoS 1 it is called when PUBACK is received and* for QoS 2 when PUBCOMP is received. The token will be the same* token as that returned when the message was published.** @param token the delivery token associated with the message.*/public void deliveryComplete(IMqttDeliveryToken token);
}
可以通过connectionLost回调进行断连提示,但是重连的状态回调呢?
(说明:可以通过mqttClient.connect(mqttConnectOptions);中的
mqttConnectOptions.setAutomaticReconnect(true); // 自动重连
设置自动重连)
哈哈。paho.mqtt.java用了一个很有意思且巧妙的方法处理,如下:
/*** Extension of {@link MqttCallback} to allow new callbacks* without breaking the API for existing applications.* Classes implementing this interface can be registered on* both types of client: {@link IMqttClient#setCallback(MqttCallback)}* and {@link IMqttAsyncClient#setCallback(MqttCallback)}*/
public interface MqttCallbackExtended extends MqttCallback {/*** Called when the connection to the server is completed successfully.* @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/public void connectComplete(boolean reconnect, String serverURI);}使用时调用:mqttClient.setCallback(callback); 传入的callback采用 MqttCallbackExtended 的实现类就可以得到 connectComplete回调了。具体实现可以详见代码。
一句话:MqttCallbackExtended 应该是后期扩展的接口,为了保持接口不变及已有代码的功能,就对原有MqttCallback接口进行了继承扩展,内部实现检测到传入的callback也是MqttCallbackExtended 的实例,就会回调 connectComplete 接口了。
// If we are using the MqttCallbackExtended, set it on the
// connectActionListener
if (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
}
可以谓之为: “扩展模式”吧!
补充说明一点:MqttCallback 中的messageArrived回调是消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了
/* (non-Javadoc)* @see org.eclipse.paho.client.mqttv3.IMqttClient#subscribe(java.lang.String, int)*/
public void subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException {this.subscribe(new String[] {topicFilter}, new int[] {qos}, new IMqttMessageListener[] {messageListener});
}
具体原因:详见代码。
org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\internal\CommsCallback.java
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception
{ boolean delivered = false;Enumeration keys = callbacks.keys();while (keys.hasMoreElements()) {String topicFilter = (String)keys.nextElement();if (MqttTopic.isMatched(topicFilter, topicName)) {aMessage.setId(messageId);((IMqttMessageListener)(callbacks.get(topicFilter))).messageArrived(topicName, aMessage);delivered = true;}}/* if the message hasn't been delivered to a per subscription handler, give it to the default handler */if (mqttCallback != null && !delivered) {aMessage.setId(messageId);mqttCallback.messageArrived(topicName, aMessage);delivered = true;}return delivered;
}
OK。验证如下:
taskClient.getMqttClient().setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {AlLog.getLogger().warn("TaskClient connectComplete reconnect={} serverURI={}", reconnect, serverURI);}@Overridepublic void connectionLost(Throwable cause) {AlLog.getLogger().error("TaskClient connectionLost " + taskClient.isConnected() + " cause:", cause);}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {AlLog.getLogger().debug("TaskClient messageArrived message:{}", message);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {AlLog.getLogger().debug("TaskClient deliveryComplete token:{}", token);}
});
日志:
02-28 14:45:09.668 E/29252.64,MQTT Ping: devXXX SubscribeTask$1.connectionLost.41: TaskClient connectionLost false cause:
02-28 14:45:10.738 W/29252.63,MQTT Call: devXXX SubscribeTask$1.connectComplete.63: TaskClient connectComplete reconnect=true serverURI=tcp://XXX.XXX.XXX.XXX:1883
02-28 14:45:26.514 W/29252.35,Timer-0 SubscribeTask$3.run.93: TimerTask taskClient isConnected true
Java-MQTT客户端监控连接状态事件相关推荐
- eclipse paho java_如何使用Eclipse Paho在Java MQTT客户端上接收时发布消息
我正在尝试使用 Eclipse Paho在Java中的MQTT客户端上实现某些功能.目标是订阅主题,并且当收到消息时,客户端发送关于另一主题的另一消息. 这看起来很容易,但我有一个奇怪的问题,我无法解 ...
- java mqtt客户端_MQTT消息队列遥测传输
说实话这个折腾了我好久好久,我才知道,原来学习不是最痛苦的,学习却学不会才是最痛苦的事.生产者将消息发布到一个主题,消费者从该协议里读取数据,MQTT是为IoT物联网通信设计的协议,MQTT使物联网低 ...
- java mqtt客户端_基于 t-io 实现一个 mqtt5 协议之 mica-mqtt
一.简介 MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的"轻量级"消息协议,由 IBM 发布 ...
- java mqtt客户端_java 实现mqtt发送和接收消息客户端具体用法及测试代码
注:客户端代码实现请看我的上一篇 1mqtt发送消息 发送时不用多讲,每次发送肯定需要运行一次发送消息方法 MyMqttClient mqttClient = new MyMqttClient(); ...
- 学习如何使用电脑客户端和ESP8266客户端来连接MQTT服务端以及订阅主题发送主题操作
MQTT原理与应用 学习如何使用电脑客户端和ESP8266客户端来连接MQTT服务端 本文章学习借鉴于太极创客团队,以表感谢.官网[http://www.taichi-maker.com/] 文章目录 ...
- tcp 服务端如何判断客户端断开连接
最近在做一个服务器端程序,C/S结构.功能方面比较简单就是client端与server端建立连接,然后发送消息给server.我在server端会使用专门的线程处理一条socket连接.这就涉及到一个 ...
- android paho框架,Android Mqtt 客户端paho使用心得
Android mqtt 客户端实现一般使用以下两个库: implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1 ...
- mqtt协议产品化实现-华为鸿蒙实现mqtt客户端
前面介绍的mosquitto项目[1]主要实现的是mqtt broker的功能,该小节介绍的物联网操作系统中的mqtt主要是实现客户端功能,因为角色的不同,所以在功能点上.架构上有很大的差异,所依赖的 ...
- java mqtt 断开连接,可以将MQTT Paho客户端断开连接吗?
我有一个 MQTT 客户端(让我们称之为Client-1)使用java PAHO,这是pub并且没有问题的子主题,在地球的另一边我有另一个客户端(让我们称之为SuperClient),可以公共主题作为 ...
最新文章
- Linuxshell之高级Shell脚本编程-创建函数
- 使用Hyperledger Ursa简化区块链安全性
- 坚强生活(转)--To 小鱼,妹妹和傻女孩们
- linux的yun命令是访问互联网,如何在Linux终端中知道你的公有IP?
- stl swap函数_vector :: swap()函数以及C ++ STL中的示例
- 数据结构06树和二叉树
- 面向对象4大特性的作用
- 图的知识点总结-数据结构
- Linux报错:/etc/sudoers is world writable
- oracle里查询表的语句,Oracle查询用户所有表的语句
- Django主从数据库分离配置
- SAP License:O2O模式网站解决方案概述
- 2021年中国醋栗果提取物市场趋势报告、技术动态创新及2027年市场预测
- MS Navision专业BBS
- Day01 郝斌C语言自学视频之 C 语言概述
- 计算机机房装修效果图,机房布线效果图
- html5+植物大战僵尸,HTML5 Canvas植物大战僵尸 - 鳄梨射手
- FPGA-RAM读写测试
- 网卡驱动编写必读-重要概念 分享
- Sentence Embedding