mqtt 异步消息 长连接 解析
mqtt 是輕量級(jí)基于代理的發(fā)布/訂閱的消息傳輸協(xié)議,設(shè)計(jì)思想是開放,簡單,輕量級(jí),且易于實(shí)現(xiàn),這些優(yōu)點(diǎn)使得他受用于任何環(huán)境
該協(xié)議的特點(diǎn)有:
?使用發(fā)布/訂閱消息的模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合
對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸
使用TCP/IO 提供的網(wǎng)絡(luò)連接
?
有三種消息發(fā)布服務(wù)質(zhì)量:
? "至多一次",消息發(fā)布完全依賴底層TCP/IP 網(wǎng)絡(luò),會(huì)發(fā)生消息丟失或者重復(fù),這一級(jí)別可用于如下情況,環(huán)境,傳感器數(shù)據(jù),丟失一次度記錄無所謂,因?yàn)椴痪弥髸?huì)有第二次發(fā)送
? "至少一次" 確保消息到達(dá),但消息重復(fù)可能發(fā)生
? “只有一次",確保消息到達(dá)一次,這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或者丟失導(dǎo)致不正確的結(jié)果
小型傳輸,開銷很小(固定床都的頭部是2個(gè)字節(jié)),協(xié)議變換最小化,以降低網(wǎng)絡(luò)流量
使用Last will和Testament 特性通知有關(guān)客戶端異常中斷的機(jī)制
?
1:配置環(huán)境
2:服務(wù)器端程序?
package com.example;import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/*** Description* 服務(wù)器向多個(gè)客戶端推送主題,即不同客戶端可向服務(wù)器訂閱相同主題* Company Beijing * author youxuan E-mail:xuanyouwu@163.com* date createTime:16/7/13* version*/ public class Server {public static final String HOST = "tcp://101.200.133.189:1883";public static final String TOPIC = "toclient/124";public static final String TOPIC125 = "toclient/125";private static final String clientid = "server";private MqttClient client;private MqttTopic topic;private MqttTopic topic125;private String userName = "admin";private String passWord = "password";private MqttMessage message;public Server() throws MqttException {// MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存client = new MqttClient(HOST, clientid, new MemoryPersistence());connect();}private void connect() {MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(false);options.setUserName(userName);options.setPassword(passWord.toCharArray());// 設(shè)置超時(shí)時(shí)間options.setConnectionTimeout(10);// 設(shè)置會(huì)話心跳時(shí)間options.setKeepAliveInterval(20);try {client.setCallback(new PushCallback());client.connect(options);topic = client.getTopic(TOPIC);topic125 = client.getTopic(TOPIC125);} catch (Exception e) {e.printStackTrace();}}public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException,MqttException {MqttDeliveryToken token = topic.publish(message);token.waitForCompletion();System.out.println("message is published completely! "+ token.isComplete());}public static void main(String[] args) throws MqttException {Server server = new Server();server.message = new MqttMessage();server.message.setQos(2);server.message.setRetained(true);server.message.setPayload("給客戶端124推送的信息:wuyouxuan".getBytes());server.publish(server.topic, server.message);server.message = new MqttMessage();server.message.setQos(2);server.message.setRetained(true);server.message.setPayload("給客戶端125推送的信息:wuyouxuan".getBytes());server.publish(server.topic125, server.message);System.out.println(server.message.isRetained() + "------ratained狀態(tài)");} }?
回調(diào)監(jiān)聽? package com.example;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * 發(fā)布消息的回調(diào)類 * * 必須實(shí)現(xiàn)MqttCallback的接口并實(shí)現(xiàn)對(duì)應(yīng)的相關(guān)接口方法CallBack 類將實(shí)現(xiàn) MqttCallBack。 * 每個(gè)客戶機(jī)標(biāo)識(shí)都需要一個(gè)回調(diào)實(shí)例。在此示例中,構(gòu)造函數(shù)傳遞客戶機(jī)標(biāo)識(shí)以另存為實(shí)例數(shù)據(jù)。* 在回調(diào)中,將它用來標(biāo)識(shí)已經(jīng)啟動(dòng)了該回調(diào)的哪個(gè)實(shí)例。 * 必須在回調(diào)類中實(shí)現(xiàn)三個(gè)方法: * * public void messageArrived(MqttTopic topic, MqttMessage message)接收已經(jīng)預(yù)訂的發(fā)布。 * * public void connectionLost(Throwable cause)在斷開連接時(shí)調(diào)用。 * * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已經(jīng)發(fā)布的 QoS 1 或 QoS 2 消息的傳遞令牌時(shí)調(diào)用。 * 由 MqttClient.connect 激活此回調(diào)。 * */ public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進(jìn)行重連 System.out.println("連接斷開,可以做重連"); } public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete()); }public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息會(huì)執(zhí)行到這里面 System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息內(nèi)容 : " + new String(message.getPayload())); } }3:客戶端程序
package com.example;import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.concurrent.ScheduledExecutorService;/*** Description* 客戶端程序* Company Beijing * author youxuan E-mail:xuanyouwu@163.com* date createTime:16/7/13* version*/ public class Client {public static final String HOST = "tcp://101.200.133.189:1883";public static final String TOPIC = "toclient/124"; private static final String clientid = "client124"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin";private String passWord = "password";private ScheduledExecutorService scheduler; private void start() { try { // host為主機(jī)名,clientid即連接MQTT的客戶端ID,一般以唯一標(biāo)識(shí)符表示,MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設(shè)置 options = new MqttConnectOptions(); // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接 options.setCleanSession(true); // 設(shè)置連接的用戶名 options.setUserName(userName); // 設(shè)置連接的密碼 options.setPassword(passWord.toCharArray()); // 設(shè)置超時(shí)時(shí)間 單位為秒 options.setConnectionTimeout(10); // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制 options.setKeepAliveInterval(20); // 設(shè)置回調(diào) client.setCallback(new PushCallback()); MqttTopic topic = client.getTopic(TOPIC); //setWill方法,如果項(xiàng)目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息 options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); //訂閱消息 int[] Qos = {1}; String[] topic1 = {TOPIC}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { Client client = new Client(); client.start(); } }參考網(wǎng)站:
https://segmentfault.com/a/1190000002809450#articleHeader0
github 開源項(xiàng)目 ??https://github.com/fusesource/mqtt-client
運(yùn)行效果:
轉(zhuǎn)載于:https://www.cnblogs.com/jiangzhaowei/p/9152646.html
總結(jié)
以上是生活随笔為你收集整理的mqtt 异步消息 长连接 解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 嵌入式linux系统,给WIFI模块增加
- 下一篇: Windows Subsystem fo