Springboot整合mqtt客户端,实现客户端之间的交互(mqtt服务器为apollo1.7)【windows下]】
目錄
- 什么是mqtt
- 簡介
- 特性
- 實現方式
- apollo1.7服務器的下載和使用
- 下載
- 使用
- Springboot整合mqtt客戶端
- Springboot和mqtt的依賴
- 客戶端代碼
- Controller代碼
- 啟動類
- 演示
- 連接apollo服務器
- 客戶端之間的交互
- 遇到的問題
- 源碼
什么是mqtt
簡介
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基于發布/訂閱(publish/subscribe)模式的輕量級協議,該協議構建于TCP/IP協議之上,MQTT最大優點在于,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。
MQTT是一個基于客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議 是輕量、簡單、開放和易于實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。
特性
MQTT協議工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:
(1)使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。
(2)對負載內容屏蔽的消息傳輸。
(3)使用TCP/IP提供網絡連接。
主流的MQTT是基于TCP連接進行數據推送的,但是同樣有基于UDP的版本,叫做MQTT-SN。這兩種版本由于基于不同的連接方式,優缺點自然也就各有不同了。
(4)有三種消息發布服務質量:
“至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用于如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。
“至少一次”,確保消息到達,但消息重復可能會發生。
“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用于即時通訊類的APP的推送,確保用戶收到且只會收到一次。
(5)小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以降低網絡流量。
這就是為什么在介紹里說它非常適合“在物聯網領域,傳感器與服務器的通信,信息的收集”,要知道嵌入式設備的運算能力和帶寬都相對薄弱,使用這種協議來傳遞消息再適合不過了。
實現方式
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。
MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:
(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
(2)payload(message),可以理解為消息的內容,是指訂閱者具體要使用的內容。
apollo1.7服務器的下載和使用
下載
apache官網對于apollo已經不更新了,我傳上來了,大家可以去我這里,0積分就能下。
使用
下載后解壓,然后cmd進入bin目錄下,
然后執行命令,apollo.cmd create mybroker,
create mybroker1之后會在bin目錄下生成mybroker1文件夾,里面包含有很多信息,其中etc\apollo.xml文件下是配置服務器信息的文件,etc\users.properties文件包含連接MQTT服務器時用到的用戶名和密碼,默認賬號和密碼分別為,admin和password。
接著進入到mybroker1的bin目錄下,并執行命令apollo-broker.cmd run,啟動服務器。
最后可以在瀏覽器中輸入http://127.0.0.1:61680/查看是否安裝成功。
輸入用戶名:admin,密碼:password,登錄。
Springboot整合mqtt客戶端
Springboot和mqtt的依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>客戶端代碼
@Component public class client1 {public client1() throws MqttException {}//主題名String topic = "test1";//QoS服務質量等級int qos = 1;//訪問服務器地址private String broker="tcp://127.0.0.1:61613";//賬號String userName="admin";//密碼String password="password" ;String clientId = "Client1";// 內存存儲MemoryPersistence persistence = new MemoryPersistence();// 創建客戶端MqttClient sampleClient = new MqttClient(broker, clientId, persistence); /初始化設置訂閱的回調public void init(){sampleClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {}//當有訂閱的消息時會從這里接收public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("clien1收到主題為:"+topic+"的消息。------\n"+"消息為::"+new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------"+ token.isComplete());}});} /// public void connect() throws MqttException {// 創建鏈接參數MqttConnectOptions connOpts = new MqttConnectOptions();// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接connOpts.setCleanSession(true);// 設置連接的用戶名connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());// 建立連接IMqttToken iMqttToken = sampleClient.connectWithResult(connOpts);boolean r=iMqttToken.isComplete();if(r){System.out.println("client1連接到服務器成功");}else {System.out.println("client1連接到服務器失敗");}} public void publish(String mes) throws MqttException {// 創建消息MqttMessage message = new MqttMessage(mes.getBytes());// 設置消息的服務質量message.setQos(qos);// 發布消息sampleClient.publish(topic, message);} / public void subscribe(String topic) throws MqttException {//訂閱消息sampleClient.subscribe(topic, qos);}///public void disconnect() throws MqttException {// 斷開連接sampleClient.disconnect();// 關閉客戶端sampleClient.close();} }我一共建了3客戶端類,分別是client1、client2、client3,代碼基本上都一樣,上面的是client1。這三個類加上@Component注解后,在創建Spring容器時就會在容器中創建這三個類的實例,要使用時用@Autowired注解注入一下就好了。
然后對于client1我這里讓他發布消息時,主題默認是"test1",client2的默認為"test2",client3的默認為"test3"(偷個懶)。(client1、client2、client3代碼絕大部分都相同,之后topic和clientId這兩個變量不一樣)
Controller代碼
@RestController @RequestMapping("mqttDemo") public class MqttDemoController {@Autowiredpublic client1 c1;@Autowiredpublic client2 c2;@Autowiredpublic client3 c3;@RequestMapping("init")//先讓用戶client1、client2、client3連接上服務器public void init() throws MqttException {c1.init();c1.connect();c2.init();c2.connect();c3.init();c3.connect();}@RequestMapping("client1/sub/{topic}")//讓用戶client1訂閱一個話題public void client1_sub( @PathVariable String topic) throws MqttException {c1.subscribe(topic);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"---client1訂閱了名為:"+topic+"的。");}@RequestMapping("client1/pub/{mes}")//讓用戶主題client1發布一個消息(話題默認為"test1")public void client1_pub( @PathVariable String mes) throws MqttException {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));c1.publish(mes);}/@RequestMapping("client2/sub/{topic}")//讓用戶client2訂閱一個話題public void client2_sub( @PathVariable String topic) throws MqttException {c2.subscribe(topic);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"---client2訂閱了名為:"+topic+"的主題。");}@RequestMapping("client2/pub/{mes}")//讓用戶client2發布一個消息(話題默認為"test2")public void client2_pub( @PathVariable String mes) throws MqttException {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));c2.publish(mes);}@RequestMapping("client3/sub/{topic}")//讓用戶client3訂閱一個話題public void client3_sub( @PathVariable String topic) throws MqttException {c3.subscribe(topic);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"---client3訂閱了名為:"+topic+"的主題。");}@RequestMapping("client3/pub/{mes}")//讓用戶client3發布一個消息(話題默認為"test3")public void client_pub( @PathVariable String mes) throws MqttException {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));c3.publish(mes);}}通過游覽器輸入網址的方式執行這些業務方法。主要就是一個客戶都有兩個動作,即訂閱和發布。然后都是通過@PathVariable獲取路徑中的參數,效果都在控制臺打印出來。
啟動類
@SpringBootApplication public class SpringbootApplication {public static void main(String[] args) throws MqttException {SpringApplication.run(SpringbootApplication.class, args);}}演示
連接apollo服務器
先連接上apollo服務器,通過訪問http://localhost:8080/mqttDemo/init,執行MqttDemoController里的–init()–方法。
控制臺會打印信息,
去apollo的控制臺,檢查是否真的連接上了服務器。
連接成功。
客戶端之間的交互
連接到服務器后,客戶端之間就可以交互了。
比如我現在要讓client3訂閱一個名為test1的話題(這個話題的信息默認由client1發布),訪問http://localhost:8080/mqttDemo/client3/sub/test1,執行MqttDemoController里的–client3_sub()–方法。
訂閱成功。
然后現在我讓client1發一條信息(當然默認主題是test1),client3就會收到。訪問http://localhost:8080/mqttDemo/client1/pub/我是client1,執行MqttDemoController里的–client1_pub()–方法。
接收成功。
最后這里可以實現一對多、多對一,就不一一演示了。
遇到的問題
剛接觸mqtt這個協議不久,在做這個demo時遇到一個小問題,
就是在這個連接函數里的connOpts.setCleanSession(true);這句代碼的意思是每次連接到服務器都以新的身份連接,然后我一開始是false,導致我上次的測試時的客戶端之間的訂閱關系都被記錄了,影響了我下一次的測試。
比如,上次的測的時候client1訂閱了test2,然后這次測時,無論client有沒有訂閱test2,都會收到這個話題的信息。
源碼
demo源碼
總結
以上是生活随笔為你收集整理的Springboot整合mqtt客户端,实现客户端之间的交互(mqtt服务器为apollo1.7)【windows下]】的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 项目管理过程标准及绩效考核
- 下一篇: hadoop三大组件