Mqtt 客户端 java API 教程
本文介紹MQTT消息,使用Eclipse Paho 庫(kù)作為MQTT java客戶(hù)端發(fā)送、接收消息。
MQTT 介紹
MQTT (MQ Telemetry Transport) 是一種消息協(xié)議,用于解決需要簡(jiǎn)單、輕量方法在低能耗設(shè)備間傳輸數(shù)據(jù),如在工業(yè)領(lǐng)域。隨著物聯(lián)網(wǎng)(IoT)設(shè)備的日益普及,MQTT的使用也越來(lái)越多,以致于OASIS宣布將MQTT(消息隊(duì)列遙測(cè)傳輸)作為新興的物聯(lián)網(wǎng)消息傳遞協(xié)議的首選標(biāo)準(zhǔn)。
該協(xié)議支持單一消息傳遞模式:發(fā)布-訂閱模式。客戶(hù)端發(fā)送的每個(gè)消息都包含一個(gè)關(guān)聯(lián)的“主題”,消息服務(wù)器使用該主題將消息路由到訂閱的客戶(hù)端。主題名稱(chēng)可以是簡(jiǎn)單的字符串,如“oiltemp”或類(lèi)似路徑的字符串“motor/1/rpm”。
消費(fèi)者為了接收消息,需用其明確的主題名稱(chēng)或包含支持通配符的字符串訂閱一個(gè)或多個(gè)主題(“#”表示多級(jí)主題,“+”表示單級(jí)主題)。
依賴(lài)庫(kù)
需要Paho 庫(kù)的 Maven 依賴(lài):
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version> </dependency>準(zhǔn)備客戶(hù)端
要使用Paho 庫(kù),手續(xù)需要實(shí)現(xiàn)IMattClient 接口,用于從MQTT服務(wù)端接收或發(fā)送消息。該接口包括所有方法,如建立連接、發(fā)送或接收消息等。
Paho 默認(rèn)提供了兩個(gè)IMattClient 接口的實(shí)現(xiàn),一個(gè)異步客戶(hù)端MqttAsyncClient、一個(gè)同步MqttClient。本文聚焦同步版本,它的語(yǔ)義相對(duì)簡(jiǎn)單。準(zhǔn)備客戶(hù)端需要兩個(gè)步驟,第一實(shí)例化MqttClient類(lèi),第二連接至服務(wù)器。下面詳細(xì)說(shuō)明。
創(chuàng)建MqttClient實(shí)例
下面代碼片段顯示如何創(chuàng)建IMqttClient同步實(shí)例:
String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);上面使用最簡(jiǎn)單的構(gòu)造函數(shù),一個(gè)服務(wù)端地址參數(shù),另一個(gè)客戶(hù)端表示(需唯一)。這里使用UUID確保每個(gè)客戶(hù)端不重復(fù),實(shí)際應(yīng)用中ID命名應(yīng)該有一定意義。
Paho還提供了其他的構(gòu)造函數(shù),我們可以使用它們來(lái)定制用于存儲(chǔ)未確認(rèn)消息的持久性機(jī)制和/或用于運(yùn)行協(xié)議引擎實(shí)現(xiàn)所需的后臺(tái)任務(wù)的ScheduledExecutorService。代碼中的服務(wù)器地址是Paho項(xiàng)目托管的公共MQTT代理,它允許任何有互聯(lián)網(wǎng)連接的人測(cè)試客戶(hù)機(jī),而不需要任何身份驗(yàn)證。
連接MQTT服務(wù)器
前面定義的MqttClient 實(shí)例并沒(méi)有連接至服務(wù)器,我們需要調(diào)用connect方法,可以傳入MqttConnectOptions實(shí)例作為參數(shù),指定協(xié)議的選項(xiàng),如指定用戶(hù)明和密碼、session恢復(fù)模式,重連接等。連接代碼如下:
public class MqttUtils {String publisherId = "t001";String url = "tcp://localhost:1883";IMqttClient mqttClient;public MqttUtils() {MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setCleanSession(true);options.setConnectionTimeout(10);options.setUserName("admin");options.setPassword("a123".toCharArray());try{mqttClient = new MqttClient(url, this.publisherId);mqttClient.connect(options);}catch (Exception e){e.printStackTrace();}} }上面連接選項(xiàng)解釋如下:
- 客戶(hù)端在遇到網(wǎng)絡(luò)問(wèn)題時(shí)會(huì)自動(dòng)重新連接
- 丟棄上一次運(yùn)行中未發(fā)送的消息
- 連接超時(shí)設(shè)置為10秒
發(fā)送消息
使用已經(jīng)連接的MqttClient發(fā)送消息非常簡(jiǎn)單。我們使用publish()方法的一個(gè)變體將有效負(fù)載(總是一個(gè)字節(jié)數(shù)組)發(fā)送到給定的主題,使用以下服務(wù)質(zhì)量選項(xiàng)之一:
- 0 -“最多一次”語(yǔ)義,也稱(chēng)為“發(fā)了就忘了”。當(dāng)可以接受消息丟失時(shí)使用此選項(xiàng),因?yàn)樗恍枰魏涡问降拇_認(rèn)或持久性
- 1 -“至少一次”語(yǔ)義。當(dāng)消息丟失不可接受且您的訂閱者可以處理副本時(shí),請(qǐng)使用此選項(xiàng)
- 2 -“恰好一次”語(yǔ)義。當(dāng)消息丟失不可接受且訂閱者無(wú)法處理副本時(shí),請(qǐng)使用此選項(xiàng)
在我們的示例中,EngineTemperatureSensor 類(lèi)扮演模擬傳感器的角色,每當(dāng)我們調(diào)用它的call()方法時(shí),它都會(huì)產(chǎn)生一個(gè)新的溫度讀數(shù)。
這個(gè)類(lèi)實(shí)現(xiàn)了Callable接口,所以我們可以很容易地將它與java.util.concurrent包中可用的ExecutorService實(shí)現(xiàn)類(lèi)一起使用:
MqttMessage封裝有效負(fù)載(消息體)、請(qǐng)求的服務(wù)質(zhì)量以及為消息保留的標(biāo)志。此標(biāo)志指示代理應(yīng)該保留此消息,直到訂閱者使用該消息為止。利用該特性可以實(shí)現(xiàn)當(dāng)新的訂閱者連接至服務(wù)器時(shí)(可能是同一客戶(hù)端斷開(kāi)了連接),會(huì)立刻接收到保留消息。
接收消息
為了接收服務(wù)器消息,需要使用subscribe()方法,可以指定下列參數(shù):
- 一個(gè)或多個(gè)主題過(guò)濾器
- 服務(wù)質(zhì)量QoS
- 回調(diào)方法用于處理接收消息
在下面的示例中,我們將展示如何向現(xiàn)有的IMqttClient實(shí)例添加消息偵聽(tīng)器,以接收來(lái)自給定主題的消息。我們使用CountDownLatch作為回調(diào)和主執(zhí)行線程之間的同步機(jī)制,每當(dāng)有新消息到達(dá)時(shí),就遞減它。
在示例代碼中使用了不同的IMqttClient實(shí)例來(lái)接收消息,這樣做只是為了更清楚哪個(gè)客戶(hù)端做什么。這不是Paho的限制-如果你愿意,你可以使用同一客戶(hù)端來(lái)發(fā)布和接收消息:
CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte[] payload = msg.getPayload();// ... payload handling omittedreceivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);上面調(diào)用subscribe()方法的subscriber變量將IMqttMessageListener實(shí)例作為它的第二個(gè)參數(shù)。我們使用簡(jiǎn)單的lambda函數(shù)來(lái)處理有效負(fù)載并減少計(jì)數(shù)器。如果在指定的時(shí)間窗口(1分鐘)內(nèi)沒(méi)有足夠的消息到達(dá),await()方法將拋出異常。
在使用Paho時(shí),我們不需要顯式地確認(rèn)收到消息。如果回調(diào)正常返回,Paho假定它是成功的消費(fèi),并向服務(wù)器發(fā)送一個(gè)確認(rèn)。
如果回調(diào)拋出異常,則客戶(hù)端將被關(guān)閉。請(qǐng)注意,這將導(dǎo)致在QoS級(jí)別為0時(shí)發(fā)送的任何消息丟失。當(dāng)客戶(hù)端重新連接并再次訂閱主題時(shí),以QoS級(jí)別1或2發(fā)送的消息將被服務(wù)器重發(fā)。
完整的測(cè)試接收代碼如下:
@Test public void whenSendMultipleMessages_thenSuccess() throws Exception {String publisherId = UUID.randomUUID().toString();MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);String subscriberId = UUID.randomUUID().toString();MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId);MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setCleanSession(true);options.setConnectionTimeout(10);publisher.connect(options); subscriber.connect(options);CountDownLatch receivedSignal = new CountDownLatch(10);subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte[] payload = msg.getPayload();log.info("[I82] Message received: topic={}, payload={}", topic, new String(payload));receivedSignal.countDown();});Callable<Void> target = new EngineTemperatureSensor(publisher);ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {try {target.call();}catch(Exception ex) {throw new RuntimeException(ex);}}, 1, 1, TimeUnit.SECONDS);receivedSignal.await(1, TimeUnit.MINUTES);executor.shutdown();assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero");log.info("[I105] Success !"); }總結(jié)
在本文中,我們演示了如何使用Eclipse Paho提供的庫(kù)在Java應(yīng)用程序中添加對(duì)MQTT協(xié)議的支持。該庫(kù)處理所有低級(jí)協(xié)議細(xì)節(jié),讓我們專(zhuān)注于解決方案的業(yè)務(wù)方面,同時(shí)留出良好的空間來(lái)定制其內(nèi)部特性,例如消息持久性。
總結(jié)
以上是生活随笔為你收集整理的Mqtt 客户端 java API 教程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java整人代码大全_整人代码大全.do
- 下一篇: 李宏毅机器学习Homework1(代码简