MQTT断线重连及订阅消息恢复
MQTT斷線重連及訂閱消息恢復
注意注意,MQTT重連后需要重新訂閱主題才能重新接收到消息
我這里使用的是
//設置斷開后重新連接 options.setAutomaticReconnect(true); @Overridepublic void connectionLost(Throwable throwable) {log.error("連接斷開,下面做重連...");long reconnectTimes = 1;while (true) {try {if (mqttClient.isConnected()) {log.warn("mqtt reconnect success end");break;}if(reconnectTimes == 10){//當重連次數達到10次時,就拋出異常,不在重連log.warn("mqtt reconnect error");return;}log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);mqttClient.reconnect();} catch (MqttException e) {log.error("", e);}try {Thread.sleep(1000);} catch (InterruptedException e1) { // e1.printStackTrace();}}}看MQTT的connec的源碼發現了一段代碼使我找到了解決方案
MqttAsyncClient 的 connect()方法
MqttReconnectCallback 是實現MqttCallbackExtended接口的
發現comms中有設置重連的回調對象
comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));
但是怎么把這個回調由我們來主動放進去呢?繼續往下看源碼可以發現
MqttReconnectCallback對象只是在連接丟失connectionLost的時候進行循環連接
點擊startReconnectCycle()最終又會回到
MqttAsyncClient 的 connect()方法
也就是如果我們在之前放入client的回調對象是實現的 MqttCallbackExtended 接口,則MQTT會將我們的回調對象放入 connectActionListener 中 然后由 connectActionListener實現具體的connect
接下來我們將 MessageCallback 對象改為實現 MqttCallbackExtended這個接口,然后實現下面方法
mqttClient.setCallback(new MqttCallbackExtended () {/*** Called when the connection to the server is completed successfully.** @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {try{//如果監測到有,號,說明要訂閱多個主題if(mqttTopic.contains(",")){//多主題String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//單主題mqttClient.subscribe(mqttTopic);}log.info("----TAG", "connectComplete: 訂閱主題成功");}catch(Exception e){e.printStackTrace();log.info("----TAG", "error: 訂閱主題失敗");}}然后可能在同一個環境,比方測試服和本地,創建同ip端口,用戶密碼clientId一樣的客戶端,那么2邊會占用資源,需要加上異常報錯,我的處理方式是連接10次不行就讓他掉線,還需要在報錯的地方加上處理:
//當創建客戶端的時候出現 已斷開連接,有可能是在另一個環境下啟動了該客戶端,直接吧這邊的客戶端關閉,不然另一邊會無限重連if(e.getMessage().equals("已斷開連接") || e.getMessage().equals("客戶機未連接")){try {mqttClient.close();} catch (MqttException ex) {ex.printStackTrace();}}以下是我的開發完整代碼,使用了多線程方式創建,
package com.t4cloud.t.sensor.entity;import com.t4cloud.t.base.redis.topic.entity.RedisMsg; import com.t4cloud.t.base.utils.RedisTopicUtil; import com.t4cloud.t.sensor.constant.MqttClientManager; import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;//MQTT客戶端線程 @Slf4j public class MqttClientThread extends Thread{//連接地址private String serverURL;//MQTT客戶端登錄用戶名private String mqttUsername;//MQTT客戶端密碼private String mqttPassWord;//MQTT訂閱主題private String mqttTopic;//MQTT的clientprivate String clientId;//產品idprivate String productId;//推送至我們自己的RedisTopIc中channelprivate String channel = "mqtt";//mqtt實體類private MqttClient mqttClient;//構造函數public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) {this.serverURL = serverURL;this.mqttUsername = mqttUsername;this.mqttPassWord = mqttPassWord;this.mqttTopic = mqttTopic;this.clientId = clientId;this.productId = productId;}//線程方法public void run(){try {// host為主機名,clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,// MemoryPersistence設置clientid的保存形式,默認為以內存保存,就用usernamemqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence());// 配置參數信息MqttConnectOptions options = new MqttConnectOptions();// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,// 這里設置為true表示每次連接到服務器都以新的身份連接options.setCleanSession(true);// 設置用戶名options.setUserName(mqttUsername);// 設置密碼options.setPassword(mqttPassWord.toCharArray());// 設置超時時間 單位為秒options.setConnectionTimeout(10);// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制 // options.setKeepAliveInterval(20);//設置斷開后重新連接options.setAutomaticReconnect(true);// 連接mqttClient.connect(options);// 訂閱//如果監測到有,號,說明要訂閱多個主題if(mqttTopic.contains(",")){//多主題String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//單主題mqttClient.subscribe(mqttTopic);}// 設置回調mqttClient.setCallback(new MqttCallbackExtended () {/*** Called when the connection to the server is completed successfully.** @param reconnect If true, the connection was the result of automatic reconnect.* @param serverURI The server URI that the connection was made to.*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {try{//如果監測到有,號,說明要訂閱多個主題if(mqttTopic.contains(",")){//多主題String[] mqttTopics = mqttTopic.split(",");mqttClient.subscribe(mqttTopics);}else{//單主題mqttClient.subscribe(mqttTopic);}log.info("----TAG", "connectComplete: 訂閱主題成功");}catch(Exception e){e.printStackTrace();log.info("----TAG", "error: 訂閱主題失敗");}}@Overridepublic void connectionLost(Throwable throwable) {log.error("連接斷開,下面做重連...");long reconnectTimes = 1;while (true) {try {if (mqttClient.isConnected()) {log.warn("mqtt reconnect success end");break;}if(reconnectTimes == 10){//當重連次數達到10次時,就拋出異常,不在重連log.warn("mqtt reconnect error");return;}log.warn("mqtt reconnect times = {} try again...", reconnectTimes++);mqttClient.reconnect();} catch (MqttException e) {log.error("", e);}try {Thread.sleep(1000);} catch (InterruptedException e1) { // e1.printStackTrace();}}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {log.info("接收消息主題 : " + topic);log.info("接收消息Qos : " + mqttMessage.getQos());log.info("接收消息內容 : " + new String(mqttMessage.getPayload()));//向我們通道中發送消息RedisMsg redisMsg = new RedisMsg();redisMsg.setChannel(channel);redisMsg.setMsg("推送MQTT消息");SensorMqttMsg mqttMsg = new SensorMqttMsg();mqttMsg.setProductId(productId);mqttMsg.setPayload(new String(mqttMessage.getPayload()));redisMsg.setData(mqttMsg);RedisTopicUtil.sendMessage(channel, redisMsg);}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {//認證過程log.info("deliveryComplete.............");}});//放入緩存,根據clinetId吧mqttClient對象放進去MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient);} catch (Exception e) {e.printStackTrace();//當創建客戶端的時候出現 已斷開連接,有可能是在另一個環境下啟動了該客戶端,直接吧這邊的客戶端關閉,不然另一邊會無限重連if(e.getMessage().equals("已斷開連接") || e.getMessage().equals("客戶機未連接")){try {mqttClient.close();} catch (MqttException ex) {ex.printStackTrace();}}}} }參考原文鏈接:https://blog.csdn.net/csdm_admin/article/details/119935243
總結
以上是生活随笔為你收集整理的MQTT断线重连及订阅消息恢复的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学Dapr Actors 看这篇就够了
- 下一篇: postgres中的中文分词zhpars