MQTT客户端监控连接状态事件

项目采用paho.mqtt.java客户端,需要监控连接状态变更事件,以进行异常维测和处理。

代码中提供了MqttCallback接口如下:

org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\MqttClient.java

public void setCallback(MqttCallback callback) {aClient.setCallback(callback);
}

其中MqttCallback 接口定义如下:

public interface MqttCallback {/** 连接断连* This method is called when the connection to the server is lost.** @param cause the reason behind the loss of connection.*/public void connectionLost(Throwable cause);/** 消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了* This method is called when a message arrives from the server.** <p>* This method is invoked synchronously by the MQTT client. An* acknowledgment is not sent back to the server until this* method returns cleanly.</p>* <p>* If an implementation of this method throws an <code>Exception</code>, then the* client will be shut down.  When the client is next re-connected, any QoS* 1 or 2 messages will be redelivered by the server.</p>* <p>* Any additional messages which arrive while an* implementation of this method is running, will build up in memory, and* will then back up on the network.</p>* <p>* If an application needs to persist data, then it* should ensure the data is persisted prior to returning from this method, as* after returning from this method, the message is considered to have been* delivered, and will not be reproducible.</p>* <p>* It is possible to send a new message within an implementation of this callback* (for example, a response to this message), but the implementation must not* disconnect the client, as it will be impossible to send an acknowledgment for* the message being processed, and a deadlock will occur.</p>** @param topic name of the topic on the message was published to* @param message the actual message.* @throws Exception if a terminal error has occurred, and the client should be* shut down.*/public void messageArrived(String topic, MqttMessage message) throws Exception;/** 消息publish完成回调* Called when delivery for a message has been completed, and all* acknowledgments have been received. For QoS 0 messages it is* called once the message has been handed to the network for* delivery. For QoS 1 it is called when PUBACK is received and* for QoS 2 when PUBCOMP is received. The token will be the same* token as that returned when the message was published.** @param token the delivery token associated with the message.*/public void deliveryComplete(IMqttDeliveryToken token);
}

可以通过connectionLost回调进行断连提示,但是重连的状态回调呢?

(说明:可以通过mqttClient.connect(mqttConnectOptions);中的

mqttConnectOptions.setAutomaticReconnect(true); // 自动重连

设置自动重连)

哈哈。paho.mqtt.java用了一个很有意思且巧妙的方法处理,如下:

/*** Extension of {@link MqttCallback} to allow new callbacks* without breaking the API for existing applications.* Classes implementing this interface can be registered on* both types of client: {@link IMqttClient#setCallback(MqttCallback)}* and {@link IMqttAsyncClient#setCallback(MqttCallback)}*/
public interface MqttCallbackExtended extends MqttCallback {/*** Called when the connection to the server is completed successfully.* @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/public void connectComplete(boolean reconnect, String serverURI);}使用时调用:mqttClient.setCallback(callback); 传入的callback采用 MqttCallbackExtended 的实现类就可以得到 connectComplete回调了。具体实现可以详见代码。
一句话:MqttCallbackExtended 应该是后期扩展的接口,为了保持接口不变及已有代码的功能,就对原有MqttCallback接口进行了继承扩展,内部实现检测到传入的callback也是MqttCallbackExtended 的实例,就会回调 connectComplete 接口了。
// If we are using the MqttCallbackExtended, set it on the
// connectActionListener
if (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
}

可以谓之为: “扩展模式”吧!

补充说明一点:MqttCallback 中的messageArrived回调是消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了

/* (non-Javadoc)* @see org.eclipse.paho.client.mqttv3.IMqttClient#subscribe(java.lang.String, int)*/
public void subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException {this.subscribe(new String[] {topicFilter}, new int[] {qos}, new IMqttMessageListener[] {messageListener});
}

具体原因:详见代码。

org.eclipse.paho.client.mqttv3-1.2.0-sources.jar!\org\eclipse\paho\client\mqttv3\internal\CommsCallback.java

protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception
{     boolean delivered = false;Enumeration keys = callbacks.keys();while (keys.hasMoreElements()) {String topicFilter = (String)keys.nextElement();if (MqttTopic.isMatched(topicFilter, topicName)) {aMessage.setId(messageId);((IMqttMessageListener)(callbacks.get(topicFilter))).messageArrived(topicName, aMessage);delivered = true;}}/* if the message hasn't been delivered to a per subscription handler, give it to the default handler */if (mqttCallback != null && !delivered) {aMessage.setId(messageId);mqttCallback.messageArrived(topicName, aMessage);delivered = true;}return delivered;
}

OK。验证如下:

taskClient.getMqttClient().setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {AlLog.getLogger().warn("TaskClient connectComplete reconnect={} serverURI={}", reconnect, serverURI);}@Overridepublic void connectionLost(Throwable cause) {AlLog.getLogger().error("TaskClient connectionLost " + taskClient.isConnected() + " cause:", cause);}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {AlLog.getLogger().debug("TaskClient messageArrived message:{}", message);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {AlLog.getLogger().debug("TaskClient deliveryComplete token:{}", token);}
});

日志:

02-28 14:45:09.668 E/29252.64,MQTT Ping: devXXX SubscribeTask$1.connectionLost.41: TaskClient connectionLost false cause:
02-28 14:45:10.738 W/29252.63,MQTT Call: devXXX SubscribeTask$1.connectComplete.63: TaskClient connectComplete reconnect=true serverURI=tcp://XXX.XXX.XXX.XXX:1883
02-28 14:45:26.514 W/29252.35,Timer-0 SubscribeTask$3.run.93: TimerTask taskClient isConnected true

Java-MQTT客户端监控连接状态事件相关推荐

  1. eclipse paho java_如何使用Eclipse Paho在Java MQTT客户端上接收时发布消息

    我正在尝试使用 Eclipse Paho在Java中的MQTT客户端上实现某些功能.目标是订阅主题,并且当收到消息时,客户端发送关于另一主题的另一消息. 这看起来很容易,但我有一个奇怪的问题,我无法解 ...

  2. java mqtt客户端_MQTT消息队列遥测传输

    说实话这个折腾了我好久好久,我才知道,原来学习不是最痛苦的,学习却学不会才是最痛苦的事.生产者将消息发布到一个主题,消费者从该协议里读取数据,MQTT是为IoT物联网通信设计的协议,MQTT使物联网低 ...

  3. java mqtt客户端_基于 t-io 实现一个 mqtt5 协议之 mica-mqtt

    一.简介 MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的"轻量级"消息协议,由 IBM 发布 ...

  4. java mqtt客户端_java 实现mqtt发送和接收消息客户端具体用法及测试代码

    注:客户端代码实现请看我的上一篇 1mqtt发送消息 发送时不用多讲,每次发送肯定需要运行一次发送消息方法 MyMqttClient mqttClient = new MyMqttClient(); ...

  5. 学习如何使用电脑客户端和ESP8266客户端来连接MQTT服务端以及订阅主题发送主题操作

    MQTT原理与应用 学习如何使用电脑客户端和ESP8266客户端来连接MQTT服务端 本文章学习借鉴于太极创客团队,以表感谢.官网[http://www.taichi-maker.com/] 文章目录 ...

  6. tcp 服务端如何判断客户端断开连接

    最近在做一个服务器端程序,C/S结构.功能方面比较简单就是client端与server端建立连接,然后发送消息给server.我在server端会使用专门的线程处理一条socket连接.这就涉及到一个 ...

  7. android paho框架,Android Mqtt 客户端paho使用心得

    Android mqtt 客户端实现一般使用以下两个库: implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1 ...

  8. mqtt协议产品化实现-华为鸿蒙实现mqtt客户端

    前面介绍的mosquitto项目[1]主要实现的是mqtt broker的功能,该小节介绍的物联网操作系统中的mqtt主要是实现客户端功能,因为角色的不同,所以在功能点上.架构上有很大的差异,所依赖的 ...

  9. java mqtt 断开连接,可以将MQTT Paho客户端断开连接吗?

    我有一个 MQTT 客户端(让我们称之为Client-1)使用java PAHO,这是pub并且没有问题的子主题,在地球的另一边我有另一个客户端(让我们称之为SuperClient),可以公共主题作为 ...

最新文章

  1. Linuxshell之高级Shell脚本编程-创建函数
  2. 使用Hyperledger Ursa简化区块链安全性
  3. 坚强生活(转)--To 小鱼,妹妹和傻女孩们
  4. linux的yun命令是访问互联网,如何在Linux终端中知道你的公有IP?
  5. stl swap函数_vector :: swap()函数以及C ++ STL中的示例
  6. 数据结构06树和二叉树
  7. 面向对象4大特性的作用
  8. 图的知识点总结-数据结构
  9. Linux报错:/etc/sudoers is world writable
  10. oracle里查询表的语句,Oracle查询用户所有表的语句
  11. Django主从数据库分离配置
  12. SAP License:O2O模式网站解决方案概述
  13. 2021年中国醋栗果提取物市场趋势报告、技术动态创新及2027年市场预测
  14. MS Navision专业BBS
  15. Day01 郝斌C语言自学视频之 C 语言概述
  16. 计算机机房装修效果图,机房布线效果图
  17. html5+植物大战僵尸,HTML5 Canvas植物大战僵尸 - 鳄梨射手
  18. FPGA-RAM读写测试
  19. 网卡驱动编写必读-重要概念 分享
  20. Sentence Embedding

热门文章

  1. 对于大型公司项目平台选择j2ee的几层认识(二)
  2. 山中老人 (updating)
  3. 大数据集群Linux环境配置
  4. 64位系统下同时使用64位和32位的eclipse
  5. zynq linux tf卡系统升级,zynq之TF卡写入常见问题
  6. html原生js实现图片轮播,原生JS实现图片轮播切换效果
  7. 五年从程序员到架构师的职业规划
  8. 关于微信防撤回(文本、图片、语音、视频、名片等...)的Python学习教程
  9. python网络爬虫从入门到实践 第5章 (一)
  10. E - EXCEL排序