



public void setCallback(MqttCallback callback) {aClient.setCallback(callback);

其中MqttCallback 接口定义如下:

public interface MqttCallback {/** 连接断连* This method is called when the connection to the server is lost.** @param cause the reason behind the loss of connection.*/public void connectionLost(Throwable cause);/** 消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了* This method is called when a message arrives from the server.** <p>* This method is invoked synchronously by the MQTT client. An* acknowledgment is not sent back to the server until this* method returns cleanly.</p>* <p>* If an implementation of this method throws an <code>Exception</code>, then the* client will be shut down.  When the client is next re-connected, any QoS* 1 or 2 messages will be redelivered by the server.</p>* <p>* Any additional messages which arrive while an* implementation of this method is running, will build up in memory, and* will then back up on the network.</p>* <p>* If an application needs to persist data, then it* should ensure the data is persisted prior to returning from this method, as* after returning from this method, the message is considered to have been* delivered, and will not be reproducible.</p>* <p>* It is possible to send a new message within an implementation of this callback* (for example, a response to this message), but the implementation must not* disconnect the client, as it will be impossible to send an acknowledgment for* the message being processed, and a deadlock will occur.</p>** @param topic name of the topic on the message was published to* @param message the actual message.* @throws Exception if a terminal error has occurred, and the client should be* shut down.*/public void messageArrived(String topic, MqttMessage message) throws Exception;/** 消息publish完成回调* Called when delivery for a message has been completed, and all* acknowledgments have been received. For QoS 0 messages it is* called once the message has been handed to the network for* delivery. For QoS 1 it is called when PUBACK is received and* for QoS 2 when PUBCOMP is received. The token will be the same* token as that returned when the message was published.** @param token the delivery token associated with the message.*/public void deliveryComplete(IMqttDeliveryToken token);



mqttConnectOptions.setAutomaticReconnect(true); // 自动重连



/*** Extension of {@link MqttCallback} to allow new callbacks* without breaking the API for existing applications.* Classes implementing this interface can be registered on* both types of client: {@link IMqttClient#setCallback(MqttCallback)}* and {@link IMqttAsyncClient#setCallback(MqttCallback)}*/
public interface MqttCallbackExtended extends MqttCallback {/*** Called when the connection to the server is completed successfully.* @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/public void connectComplete(boolean reconnect, String serverURI);}使用时调用:mqttClient.setCallback(callback); 传入的callback采用 MqttCallbackExtended 的实现类就可以得到 connectComplete回调了。具体实现可以详见代码。
一句话:MqttCallbackExtended 应该是后期扩展的接口,为了保持接口不变及已有代码的功能,就对原有MqttCallback接口进行了继承扩展,内部实现检测到传入的callback也是MqttCallbackExtended 的实例,就会回调 connectComplete 接口了。
// If we are using the MqttCallbackExtended, set it on the
// connectActionListener
if (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);

可以谓之为: “扩展模式”吧!

补充说明一点:MqttCallback 中的messageArrived回调是消息收到回调的缺省回调,如果subscribe(String topicFilter, int qos, IMqttMessageListener messageListener)指定了 IMqttMessageListener 会覆盖该回调,也即:使用IMqttMessageListener 回调,MqttCallback 中的messageArrived回调无响应了

/* (non-Javadoc)* @see org.eclipse.paho.client.mqttv3.IMqttClient#subscribe(java.lang.String, int)*/
public void subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException {this.subscribe(new String[] {topicFilter}, new int[] {qos}, new IMqttMessageListener[] {messageListener});



protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception
{     boolean delivered = false;Enumeration keys = callbacks.keys();while (keys.hasMoreElements()) {String topicFilter = (String)keys.nextElement();if (MqttTopic.isMatched(topicFilter, topicName)) {aMessage.setId(messageId);((IMqttMessageListener)(callbacks.get(topicFilter))).messageArrived(topicName, aMessage);delivered = true;}}/* if the message hasn't been delivered to a per subscription handler, give it to the default handler */if (mqttCallback != null && !delivered) {aMessage.setId(messageId);mqttCallback.messageArrived(topicName, aMessage);delivered = true;}return delivered;


taskClient.getMqttClient().setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {AlLog.getLogger().warn("TaskClient connectComplete reconnect={} serverURI={}", reconnect, serverURI);}@Overridepublic void connectionLost(Throwable cause) {AlLog.getLogger().error("TaskClient connectionLost " + taskClient.isConnected() + " cause:", cause);}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {AlLog.getLogger().debug("TaskClient messageArrived message:{}", message);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {AlLog.getLogger().debug("TaskClient deliveryComplete token:{}", token);}


02-28 14:45:09.668 E/29252.64,MQTT Ping: devXXX SubscribeTask$1.connectionLost.41: TaskClient connectionLost false cause:
02-28 14:45:10.738 W/29252.63,MQTT Call: devXXX SubscribeTask$1.connectComplete.63: TaskClient connectComplete reconnect=true serverURI=tcp://XXX.XXX.XXX.XXX:1883
02-28 14:45:26.514 W/29252.35,Timer-0 SubscribeTask$ TimerTask taskClient isConnected true


