MQTT从入门到放弃
MQTT,是一種基于發布/訂閱模式的"輕量級"通訊協議,該協議構建于TCP/IP協議上,屬于應用層協議,。
基于TCP協議、發布/訂閱協議,屬于應用層協議。使用C/S架構,本質是一個消息轉發協議。所有的客戶端往服務器發送消息,然后服務端根據過濾規則,把消息再轉發給符合條件的客戶端。消息的傳輸是有序的、可靠的、雙向的。
一、概述
1.1 參考文檔
- 官方文檔 (推薦) http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
- 官方文檔(中文) http://mqtt.p2hp.com/mqtt311
1.2 MQTT優點
MQTT最大優點在于,可以以 極少的代碼和有限的帶寬 ,為遠程連接設備提過實時可靠的消息服務,作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用
- 保持長連接,具有一定實時性
- 適應高延時,偶爾斷網
- 支持高并發
- 單次數據量小
- 傳輸可靠
- 提供不同QoS(服務優先級)
- 設置遺囑消息
1.3 MQTT應用領域
MQTT是基于二進制消息的發布/訂閱編程模式的消息協議,非常適合
需要 低功耗 和 網絡帶寬有限 的IoT場景
比如: 遙感數據、汽車、 智能家居、智慧城市、醫療醫護、智慧農業 …
二、MQTT協議原理
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。
注意:消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。
2.1 MQTT客戶端
一個使用MQTT協議的應用程序或者設備,它總是建立到服務器的網絡連接。客戶端可以:
- 發布其他客戶端可能會訂閱的信息
- 訂閱其它客戶端發布的消息
- 退訂或刪除應用程序的消息
- 斷開與服務器連接
2.2 MQTT服務端
QTT服務器以稱為“消息代理”(Broker),可以是一個應用程序或一臺設備。它是位于消息發布者和訂閱者之間,它可以:
- 接受來自客戶的網絡連接
- 接受客戶發布的應用信息
- 處理來自客戶端的訂閱和退訂請求
- 向訂閱的客戶轉發應用程序消息
2.3 消息結構
每條MQTT命令消息的消息頭都包含一個固定的報頭,有些消息會攜帶一個可變報文頭和一個負荷。消息格式如下:
固定報文頭 | 可變報文頭 | 負載
2.3.1 固定報文頭
存在于所有MQTT數據包中,表示數據包類型及數據包的分組類標識。
MQTT固定報文頭最少有兩個字節,第一字節包含消息類型(Message Type)和QoS級別等標志位。第二字節開始是剩余長度字段,該長度是后面的可變報文頭加消息負載的總長度,該字段最多允許四個字節。
2.3.2 可變報文頭
存在于部分MQTT數據包中,數據包類型決定了可變頭是否存在及其具體內容。
可變報文頭主要包含協議名、協議版本、連接標志(Connect Flags)、心跳間隔時間(Keep Alive timer)、連接返回碼(Connect Return Code)、主題名(Topic Name)等。
2.3.3 負載
Payload直譯為負荷,消息的內容。存在于部分MQTT數據包中,表示客戶端收到的具體內容。
2.4 MQTT特點
2.4.1 MQTT的消息類型
固定報文頭中的第一個字節包含連接標志(Connect Flags),連接標志用來區分MQTT的消息類型。MQTT協議擁有14種不同的消息類型(見下表),可簡單分為連接及終止、發布和訂閱、QoS 2消息的機制以及各種確認ACK。至于每一個消息類型會攜帶什么內容,這里不多闡述
2.4.2 服務質量(QOS)
2.4.2.1 QOS分類
服務質量水平(QoS)是一個消息的發送者和限定遞送保證用于特定消息的消息的接收器之間的協議。MQTT 中有 3 個 QoS 級別:
- QoS0:發送就不管了,最多一次;
- QoS1:發送之后依賴MQTT規范,是否啟動重傳消息,所以至少一次;
- QoS2:發送之后依賴MQTT消息機制,確保只有一次。
QoS0 代表,Sender 發送的一條消息,Receiver 最多能收到一次,也就是說 Sender 盡力向 Receiver發送消息,如果發送失敗,也就算了;這是完全依賴TCP重傳機制,如果網絡不好,TCP的重傳也不是100%可靠,加上MQTT是Publisher 發出去的消息是依賴代理服務器完成轉發,所以消息最多一次。
QoS1 代表,Sender 發送的一條消息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver發送消息,如果發送之后沒有收到對應的PUBACK,就會繼續重試,直到發送者Sender 接收到 Receiver 發送的 PUBACK為止,因為重傳的原因,Receiver 有可能會收到重復的消息;
QoS2 代表,Sender 發送的一條消息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發送消息,如果發送失敗,會繼續重試,直到 Receiver 收到消息為止,同時保證 Receiver 不會因為消息重傳而收到重復的消息。(個人理解這一點有點像TCP三次握手的交互過程)
2.4.2.2 QOS特性
- QoS 是 MQTT 協議的一個關鍵特性。QoS 使客戶端能夠選擇與其網絡可靠性和應用程序邏輯相匹配的服務級別。因為 MQTT 管理消息的重新傳輸并保證交付(即使底層傳輸不可靠),QoS 使不可靠網絡中的通信變得更加容易。
- QoS流,在發送端和接收端是兩件不同的事情。當然發送端與接收端QoS的等級也可以不一樣。在發送端與broker之間,發送端定義了QoS等級。當broker發送消息到接收端是,接收端決定了QoS的等級
- 發送(發布)消息的客戶端和接收消息的客戶端之間的 QoS 定義和級別是兩件不同的事情。這兩種交互的 QoS 級別也可以不同。向代理發送 PUBLISH 消息的客戶端定義消息的 QoS。但是,當代理將消息傳遞給接收者(訂閱者)時,代理使用接收者(訂閱者)在訂閱期間定義的 QoS。例如,客戶端 A 是消息的發送者。客戶端 B 是消息的接收者。如果客戶端 B 以 QoS 1 訂閱代理并且客戶端 A 以 QoS 2 向代理發送消息,則代理以 QoS 1 將消息傳遞給客戶端 B(接收者/訂閱者)。
2.4.2.3 QOS應用場景
QoS 0
- 發送方和接收方之間建立了完全或大部分穩定的連接。
- 不介意偶爾丟失幾條消息。如果數據不是那么重要或數據間隔很短,則某些消息的丟失是可以接受的
- 不需要消息隊列。僅當斷開連接的客戶端具有 QoS 1 或 2 和持久會話時,消息才會排隊
QoS 1
- 您需要獲取每條消息,并且您的用例可以處理重復項。QoS 級別 1 是最常用的服務級別,因為它保證消息至少到達一次,但允許多次傳遞。當然,您的應用程序必須容忍重復并能夠相應地處理它們。
- 無法承受 QoS 2 的開銷。QoS 1 傳遞消息的速度比 QoS 2 快得多。
QoS 2
- 支付場景。一次接收所有消息對您的應用程序至關重要。如果重復交付可能損害應用程序用戶或訂閱客戶端,則通常會出現這種情況。請注意開銷以及 QoS 2 交互需要更多時間才能完成。
關于QOS的優秀連接:
https://blog.csdn.net/m0_50668851/article/details/112555171
https://blog.csdn.net/qq1623803207/article/details/89518318
2.4.3 遺愿標志(Will Flag)
在可變報文頭的連接標志位字段(Connect Flags)里有三個Will標志位:Will Flag、Will QoS和Will Retain Flag,這些Will字段用于監控客戶端與服務器之間的連接狀況。如果設置了Will Flag,就必須設置Will QoS和Will Retain標志位,消息主體中也必須有Will Topic和Will Message字段。
那遺愿消息是怎么回事呢?
服務器與客戶端通信時,當遇到異常或客戶端心跳超時的情況,MQTT服務器會替客戶端發布一個Will消息。當然如果服務器收到來自客戶端的DISCONNECT消息,則不會觸發Will消息的發送。
因此,Will字段可以應用于設備掉線后需要通知用戶的場景。
2.4.4 連接保活心跳機制(Keep Alive Timer)
MQTT客戶端可以設置一個心跳間隔時間(Keep Alive Timer),表示在每個心跳間隔時間內發送一條消息。如果在這個時間周期內,沒有業務數據相關的消息,客戶端會發一個PINGREQ消息,相應的,服務器會返回一個PINGRESP消息進行確認。如果服務器在一個半(1.5)心跳間隔時間周期內沒有收到來自客戶端的消息,就會斷開與客戶端的連接。心跳間隔時間最大值大約可以設置為18個小時,0值意味著客戶端不斷開。
2.4.5 MQTT vs MQ
MQTT:一種通信協議,類似人類交談中的漢語、英語、俄語中的一種語言規范
MQ:一種通信通道,也叫消息隊列,類似人類交談中的用電話、email、微信的一種通信方式
市面上的MQ產品很多,如阿里自研并開源RocketMQ,還有類似RabbitMQ、ActiveMQ,他們不僅支持MQTT協議,還支持如AMQP、stomp協議等等,EMQ 使用的協議是mqtt。
| ActiveMQ | ActiveMQ是Apache軟件基金會的開源產品,支持AMQP協議、MQTT協議(和XMPP協議作用類似)、Openwire協議和Stomp協議等多種消息協議。并且ActiveMQ完整支持JMS API接口規范。 |
| RabbitMQ | RabbitMQ基于Erlang語言開發和運行。它與Apache ActiveMQ有很多相同的特性,例如RabbitMQ完整支持多種消息協議:AMQP、STOMP、MQTT、HTTP,我們使用RabbitMQ時會默認使用AMQP1.0 協議。當然,RabbitMQ作為Apache ActiveMQ最主要的競品之一也有其獨特的功能特性。例如RabbitMQ支持一套特有的Routing-Exchange消息路由規則。這套規則可以按照消息內容,自動將消息歸類到不同的消息隊列中。 |
2.4.6 協議對比
下圖是各個協議間的對比:
MQTT協議(低帶寬)
MQTT (Message Queuing Telemetry Transport ),消息隊列遙測傳輸,由IBM開發的即時通訊協議,相比來說比較適合物聯網場景的通訊協議。MQTT協議采用發布/訂閱模式,所有的物聯網終端都通過TCP連接到云端,云端通過主題的方式管理各個設備關注的通訊內容,負責將設備與設備之間消息的轉發。
適用范圍:在低帶寬、不可靠的網絡下提供基于云平臺的遠程設備的數據傳輸和監控。
MQTT協議一般適用于設備數據采集到端(Device-》Server,Device-》Gateway),集中星型網絡架構(hub-and-spoke),不適用設備與設備之間通信,設備控制能力弱,另外實時性較差,一般都在秒級。
AMQP協議(互操作性)
AMQP(Advanced Message Queuing Protocol),先進消息隊列協議,這是OASIS組織提出的,該組織曾提出OSLC(Open Source Lifecyle)標準,用于業務系統例如PLM,ERP,MES等進行數據交換。
適用范圍:最早應用于金融系統之間的交易消息傳遞,在物聯網應用中,主要適用于移動手持設備與后臺數據中心的通信和分析。
XMPP協議(即時通信)
XMPP(Extensible Messaging and Presence Protocol)可擴展通訊和表示協議,XMPP的前身是Jabber,一個開源形式組織產生的網絡即時通信協議。XMPP目前被IETF國際標準組織完成了標準化工作。
適用范圍:即時通信的應用程序,還能用在網絡管理、內容供稿、協同工具、檔案共享、游戲、遠端系統監控等。
JMS (Java Message Service)
Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持
JMS是協議同時也是 Java 消息服務規范的標準實現,同時也是 Java 企業版(JEE)規范的一部分。
優秀連接 https://blog.csdn.net/gyshun/article/details/83036987
2.5 消息持久化
需要滿足以下三個條件:
1、cleanSession = false
2、clientId不為空
3、客戶端subscribe時的Qos=1,發布端publish時的Qos=1
2.6 ※ 實現方式!!!
參考鏈接:https://blog.51cto.com/u_15067242/2574302
MQTT客戶端采用的是Spring Intergration和Eclipse.paho的方式實現的。當然,你也可以直接使用Eclipse.paho作為客戶端連接。
2.6.1 Spring Intergration
官方的說法我就不過多的解釋了,比較晦澀,這里我談一下自己的理解。其實Spring Intergration就類似一個水電系統。總閘、各樓層的控制、分流、聚合、過濾、沉淀、消毒、排污,這里的每一個環節都類似一個系統服務,可能是MQTT,可能是Redis,可能是MongoDB,可能是Job,可能是我們系統服務的任何一個模塊。那么Spring Intergration扮演的角色就是將這些功能能夠連接起來組成一個完整的服務系統,實現企業系統的集成的解決方案。就像管道一樣各個模塊連接到起,管道能夠連接到千家萬戶需要很多水表、分頭管、水龍頭,管道開關等等這些都是Spring Intergration的主要組件。
<!-- mqtt依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>spring-integration-mqtt內部依賴了Eclipse.paho的包,所以不需要在單獨引入
關于Spring Intergration的版本問題,其官方文檔:官方文檔連接,值得深看!
這里是對官方文檔的部分謄寫&理解:
???????從4.1版本開始,編程方式改變適配器訂閱的主題可以省略url,DefaultMqttPahoClientFactory屬性serverURIs可以提供服務端URI,例如,這將使能連接至HA高可用簇。
???????從4.2.2版本開始,當適配器成功訂閱至主題后,發布MqttSubscribedEvent,當連接/訂閱失敗時,發布MqttConnectionFailedEvent。這些事件可以由實現ApplicationListener接口的實體類獲取。
???????新的屬性recoveryInterval控制在故障之后適配器會嘗試重新連接的時間間隔,默認為10000ms(10s)
???????在4.2.3版本之前,當適配器停止后,客戶端總是會解除訂閱,這是不正確的。 ,因為如果客戶端QoS大于0,我們需要保持訂閱以便適配器停止時到達的消息在下一次開始時會傳送。這也需要設置客戶端工廠cleanSession屬性為false,默認值為true。
???????從4.2.3版本開始,如果cleanSession值為false,適配器不會解除訂閱(默認)。可以重寫該行為,通過設置工廠屬性consumerCloseAction,可以有以下值:UNSUBSCRIBE_ALWAYS,UNSUBSCRIBE_NEVER以及UNSUBSCRIBE_CLEAN,后者(默認)會解除訂閱僅當cleanSession屬性值為true。回退至4.2.3之前的行為,使用UNSUBSCRIBE_ALWAYS。
2.6.2 Eclipse.paho
Eclipse.paho是基于IMqttClient和IMqttAnsycClient接口實現的MQTT客戶端中間件。其內部實現了完整的消息發布與訂閱、socket長連接、心跳機制、斷線重連以及消息本地緩存等一系列功能。是目前比較主流的MQTT客戶端中間件。
2.7 重連
MQTT有個自動重連功能。有兩種方式可以實現自動重連。
2.7.1 使用Spring Intergration方式
基于 MQTT連接配置類MqttConnectOption類可以設置自動重連。
// 斷開后重連,但這個方法并沒有重新訂閱的機制 // 在嘗試重新連接之前,它將首先等待1秒,對于每次失敗的重新連接嘗試,延遲將加倍,直到達到2分鐘,此時延遲將保持在2分鐘。 options.setAutomaticReconnect(true);// 接受離線消息 告訴代理客戶端是否要建立持久會話 false為建立持久會話 mqttConnectOptions.setCleanSession(Boolean.FALSE);若使用了Spring Intergration方式實現mqtt客戶端,那么只用將setAutomaticReconnect設置為true,setCleanSession設置為false即可。
原因:
因此,只要保證這兩點,mqtt即可斷線重連。
2.7.2 使用Eclipse.paho(mqttv3)方式
------------------------------------------------------或者------------------------------------------------------
方法二:在connectionLost () 回調函數中自定義重新連接、重新訂閱
2.8 連接、斷開通知(踩坑處)
坑一: 博主博人嘗試了使用Spring Intergration方式,再加上實現接口MqttCallbackExtended /MqttCallback,發現并沒有在連接斷開、重連、收到消息的時候進入該方法。再看MqttCallbackExtended /MqttCallback這兩個接口都在package org.eclipse.paho.client.mqttv3包下,因此推斷該方式僅限用于mqttv3方式下使用!!!
坑二 :在運用過程中,會出現斷開連接第一次重連成功后,一直斷開連接再重連再斷開連接再重連的死循環中
- 問題原因:
創建了相同clientid 的MqttClient。 - 問題解析:
因為clientid是MqttClient的唯一標識,當重新new上一個clientid相同的MqttClient就會出現重連時創建的MqttClient使程序中初始化時創建的MqttClient斷開連接,斷開連接后就會回滾到connectionLost方法中,然后此方法中又會繼續重連。 - 解決辦法:
1.不需要重新new一個MqttClient,只需要調用connect()方法就OK了。
2.在創建clientId的時候,最后添加上隨機數,那樣每次都是不同的clientId
2.8.1 MqttCallback
官方文檔鏈接 在此,惡靈退散~~~~
MqttCallback:使應用程序能夠在與客戶端相關的異步事件發生時得到通知。實現此接口的類可以在兩種類型的客戶端上注冊:IMqttClient.setCallback(MqttCallback) 和IMqttAsyncClient.setCallback(MqttCallback)
public interface MqttCallback {//當與服務器的連接丟失時調用此方法。public void connectionLost(Throwable cause);//當消息從服務器到達時調用此方法。public void messageArrived(String topic, MqttMessage message) throws Exception;//當消息的傳遞完成并收到所有確認時調用。public void deliveryComplete(IMqttDeliveryToken token); }2.8.2 MqttCallbackExtended
官方文檔鏈接 在此,萬國臣服~~~~
MqttCallbackExtended是paho.mqtt.java客戶端需要監控連接狀態變更事件,以進行異常維測和處理所提供的接口。
MqttCallbackExtended接口繼承了MqttCallback接口,并在其基礎上,新增方法:
參數:
- reconnect- 如果為真,則連接是自動重新連接的結果。
- serverURI- 建立連接的服務器 URI。
2.9 動態訂閱主題
其實,我們在使用在 Spring Intergration&采用工廠模式,初始化訂閱者時,已經預先設置了主題
@Beanpublic MessageProducer inbound() {// 可以同時消費(訂閱)多個Topic// Paho客戶端消息驅動通道適配器,主要用來訂閱主題 對inboundTopics主題進行監聽MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId,mqttClientFactory(), consumerTopic);adapter.setCompletionTimeout(timeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);// 設置訂閱通道adapter.setOutputChannel(mqttInboundChannel());//adapter.setErrorChannel();return adapter;}其中consumerTopic在初始化后就可以訂閱,但是不符合 ‘在使用過程中想訂閱新的主題’ 的使用場景,因此這里借助了MqttPahoMessageDrivenChannelAdapter的添加/刪除主題 的方法
注意:MessageProducer類有沒有提供“添加主題”的方法,在實例化bean過程中,真正對象是MqttPahoMessageDrivenChannelAdapter的實例對象,因此可以斷定MessageProducer是MqttPahoMessageDrivenChannelAdapter的引用類,即父類。
因此,為了使用MqttPahoMessageDrivenChannelAdapter的addTopic(String topic, int qos)、 removeTopic(String... topic)的方法有以下兩種方式:
三、MQTT代碼
3.1 MQTT配置類
package com.ruoyi.common.config;import com.alibaba.fastjson.JSON; import com.ruoyi.zy.mqtt.MqttConsumer; import com.ruoyi.zy.mqtt.ZyMqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;@Configuration public class MqttConfig {private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);@AutowiredZyMqttCallback zyMqttCallback;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.url}")private String hostUrl;@Value("${mqtt.producerClientId}")private String producerClientId;@Value("${mqtt.producerTopic}")private String producerTopic;//生產者和消費者是單獨連接服務器會使用到一個clientid(客戶端id),// 如果是同一個clientid的話會出現Lost connection: 已斷開連接; retrying...@Value("${mqtt.consumerClientId}")private String consumerClientId;@Value("${mqtt.consumerTopic}")private String[] consumerTopic;@Value("${mqtt.timeout}")private int timeout;@Value("${mqtt.keepalive}")private int keepalive;//入站通道名(消費者)訂閱的bean名稱public static final String CHANNEL_NAME_IN = "mqttInboundChannel";//出站通道名(生產者)發布的bean名稱public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";/*** MQTT連接器基本信息選項** @return {@link MqttConnectOptions}*/@Beanpublic MqttConnectOptions getMqttConnectOptions() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());// 設置連接地址,支持集群模式mqttConnectOptions.setServerURIs(new String[]{hostUrl});mqttConnectOptions.setKeepAliveInterval(keepalive);// 接受離線消息 告訴代理客戶端是否要建立持久會話 false為建立持久會話mqttConnectOptions.setCleanSession(Boolean.FALSE);//設置重連機制mqttConnectOptions.setAutomaticReconnect(true);// 設置遺囑消息MqttMessage mqttMessage = new MqttMessage();mqttMessage.setPayload("bit_plate offline".getBytes());System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!"+JSON.toJSONString(mqttMessage).getBytes());mqttConnectOptions.setWill("topic_offline", JSON.toJSONString(mqttMessage).getBytes(), 1, true);return mqttConnectOptions;}/*** MQTT客戶端. 創建MqttPahoClientFactory,設置MQTT Broker連接屬性,如果使用SSL驗證,也在這里設置。** @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}*/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}/*******************************生產者*******************************************//*** MQTT信息通道(生產者)*/@Bean(name = CHANNEL_NAME_OUT)public MessageChannel mqttOutboundChannel() {//使用點對點模型,消息管道類型DirectChannelreturn new DirectChannel();}/*** MQTT消息處理器(生產者)* <p>* ServiceActivator注解表明:當前方法用于處理MQTT消息,outputChannel參數指定了用于生產消息的channel。*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(producerClientId, mqttClientFactory());// 如果設置成true,即異步,發送消息時將不會阻塞。messageHandler.setAsync(true);messageHandler.setDefaultTopic(producerTopic);return messageHandler;}/*******************************消費者*******************************************//*** MQTT信息通道(消費者)*/@Bean(name = CHANNEL_NAME_IN)public MessageChannel mqttInboundChannel() {return new DirectChannel();}/*** MQTT消息訂閱綁定(消費者)*/@Beanpublic MessageProducer inbound() {//管道適配器。因為外部協議有無數種,消息適配器則用于連接不同協議的外部系統。從外部系統讀入數據并對數據進行處理最終// 與Spring Integration 內部的消息系統適配。例如將要進行Mqtt集成,那么就需要一個Mqtt的管道適配器,// 可以同時消費(訂閱)多個Topic// Paho客戶端消息驅動通道適配器,主要用來訂閱主題 對inboundTopics主題進行監聽MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(consumerClientId,mqttClientFactory(), consumerTopic);adapter.setCompletionTimeout(timeout);//編解碼器。該方法用與對消息負載進行編解碼。-----可自定義,但是需要實現MqttMessageConverter接口!!!adapter.setConverter(new DefaultPahoMessageConverter());// 設置消息的服務質量// 0:最多一次傳送 (只負責傳送,發送過后就不管數據的傳送情況)// 1:至少一次傳送 (確認數據交付)// 2:正好一次傳送 (保證數據交付成功)adapter.setQos(1);// 設置訂閱通道adapter.setOutputChannel(mqttInboundChannel());//adapter.setErrorChannel();return adapter;}/*** MQTT消息處理器(消費者)*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_IN)public MessageHandler handler() {return new MqttConsumer();}//如果我要配置多個client,只要配置多個通道即可//通道2 // @Bean // public MessageChannel mqttInputChannelTwo() { // return new DirectChannel(); // } // //配置client2,監聽的topic:hell2,hello3 // @Bean // public MessageProducer inbound1() { // MqttPahoMessageDrivenChannelAdapter adapter = // new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), // "hello2","hello3"); // adapter.setCompletionTimeout(timeout); // adapter.setConverter(new DefaultPahoMessageConverter()); // adapter.setQos(1); // adapter.setOutputChannel(mqttInputChannelTwo()); // return adapter; // } // // //通過通道2獲取數據 // @Bean // @ServiceActivator(inputChannel = "mqttInputChannelTwo") // public MessageHandler handlerTwo() { // return new MqttConsumer(); // } }3.2 MQTT生產者
package com.ruoyi.zy.mqtt;import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;/*** @description mqtt生產者* @since 2022/5/24 10:16*/ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttProducer {void sendToMqtt(String data);void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}3.2 MQTT消費者
package com.ruoyi.zy.mqtt;import cn.hutool.core.convert.Convert; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ruoyi.common.exception.CustomException; import com.ruoyi.common.utils.HarWarUtils; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.zy.annotion.Zy5gDataField; import com.ruoyi.zy.constants.MqttConstants; import com.ruoyi.zy.constants.MsgIdEnum; import com.ruoyi.zy.dto.body.accept.AcceptBootBody; import com.ruoyi.zy.dto.body.accept.AcceptHeartBody; import com.ruoyi.zy.dto.data.accept.AcceptBootData; import com.ruoyi.zy.service.ZyService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component;import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List;import static cn.hutool.core.util.ReflectUtil.getMethod; import static com.ruoyi.kwt.uav5g.message.Kwt5gData.getFieldsType;/*** @description mqtt消費者* @since 2022/5/24 11:13*/ @Component @ConditionalOnProperty(value = "mqtt.enable", havingValue = "true") public class MqttConsumer implements MessageHandler {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate ZyService zyService;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {try {String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));//獲取消息內容String msgStr = String.valueOf(message.getPayload());logger.info("接收到mqtt消息,主題:{} 消息:{}", topic, msgStr);if (StringUtils.isEmpty(msgStr)) {logger.error("錯誤!接收到 mqtt消息,主題:{} 消息為空", topic);return;}JSONObject jsonObj = (JSONObject) JSONObject.parse(msgStr);Integer msg_id = jsonObj.getJSONObject(MqttConstants.HEAD).getInteger(MqttConstants.MSG_ID);if (null == msg_id) {throw new CustomException("mqtt收到消息錯誤!");}switch (MsgIdEnum.getByMsgCode(msg_id)) {case BOOT:handleBootBody(jsonObj);break;case HEART:handleHeartBody(jsonObj);break;case LIVE:handleLiveBody(jsonObj);break;case C2:handleC2Body(jsonObj);break;case FLY:handleFlyBody(jsonObj);break;default:throw new CustomException("mqtt收到消息錯誤!未識別對應的 msg_id:{}", msg_id);}} catch (CustomException ce) {logger.error(ce.getMessage());} catch (Exception e) {e.printStackTrace();}}public void handleBootBody(JSONObject jsonObj) {try {AcceptBootBody acceptBootBody = JSON.toJavaObject(jsonObj, AcceptBootBody.class);AcceptBootData data = acceptBootBody.getData();Field[] fields = data.getClass().getDeclaredFields();for (Field field : fields) {Zy5gDataField ann = field.getAnnotation(Zy5gDataField.class);if (ann == null) {continue;}//json字段解析Class fieldType = field.getType();Object originalObj = jsonObj.getJSONObject(MqttConstants.DATA).get(field.getName());Object newObj = bodyParseObjValue(fieldType, originalObj, ann.ratio());field.setAccessible(true);field.set(data, newObj);}//處理了* / 倍率之后的正確數值zyService.handleBootBody(acceptBootBody);} catch (Exception e) {e.printStackTrace();}}private static Object bodyParseObjValue(Class<?> type, Object originalObj, float ratio) throws Exception {//數組if (originalObj instanceof List) {List bodyList = Convert.toList(type.newInstance().getClass(), originalObj);List destList = new ArrayList<>();for (int k = 0; k < bodyList.size(); k++) {Object test = type.newInstance();Field[] fields = type.getDeclaredFields();for (Field field : fields) {Zy5gDataField fieldAnnotation = field.getAnnotation(Zy5gDataField.class);if (fieldAnnotation == null) {continue;}Class<?> fieldType = getFieldsType(field);String methodName = field.getName().substring(0, 1).toUpperCase() + field.getName().substring(1);String getMethodName = "get" + methodName;Method m = getMethod(type, getMethodName);Object obj = m.invoke(bodyList.get(k));Object newObj = harWarJsonParseObjValueBasic(fieldType, obj, fieldAnnotation.ratio());field.set(test, newObj);}destList.add(test);}return destList;}//基類(僅數字相關)Object newObj = harWarJsonParseObjValueBasic(type, originalObj, ratio);//String等其余非基本類型返回原值return newObj;}/*** 解析json字段*/private static Object harWarJsonParseObjValueBasic(Class<?> type, Object originalObj, float ratio) throws Exception {if (HarWarUtils.isEmpty(originalObj) || ratio == 1) {return originalObj;}if (ratio == 0) {throw new CustomException(type + "參數的倍率ratio不能為0!");}if (type.equals(int.class) || type.equals(Integer.class)) {originalObj = (Integer) originalObj / ratio;} else if (type.equals(float.class) || type.equals(Float.class)) {originalObj = Float.parseFloat(originalObj.toString()) / ratio;} else if (type.equals(double.class) || type.equals(Double.class)) {originalObj = Double.parseDouble(originalObj.toString()) / ratio;} else if (type.equals(long.class) || type.equals(Long.class)) {originalObj = Long.parseLong(originalObj.toString()) / ratio;} else if (String.class.isAssignableFrom(type)) {originalObj = originalObj.toString();} else {throw new Exception("不支持的數據類型,type=" + type);}return originalObj;}public void handleHeartBody(JSONObject jsonObj) {AcceptHeartBody acceptHeartBody = JSON.toJavaObject(jsonObj, AcceptHeartBody.class);zyService.handleHeartBody(acceptHeartBody);}public void handleLiveBody(JSONObject jsonObj) {}public void handleC2Body(JSONObject jsonObj) {}public void handleFlyBody(JSONObject jsonObj) {}}總結
以上是生活随笔為你收集整理的MQTT从入门到放弃的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 互联网公司忽悠员工的黑话,套路太深了。。
- 下一篇: 操作系统基础知识笔记