反应式系统实现MQTT客户机
反應式系統實現MQTT客戶機
Implementing an MQTT client for reactive systems
MQTT Reactive是從LiamBindle的MQTT-C庫派生的MQTT v3.1.1客戶機。MQTT-Reactive的目的是提供一個用C語言編寫的可移植、無阻塞的MQTT客戶機,以便在反應式嵌入式系統中使用。首先,本文解釋了什么是反應式系統。然后,介紹了如何設計適合該類系統的軟件結構。最后,本文展示了如何通過使用狀態機和事件驅動范式在反應式系統中使用MQTT反應式庫。為了做到這一點,本文以一個實際的物聯網設備為例,通過使用UML圖(如狀態機、交互和結構)來解釋其軟件結構和基于狀態的行為。本文還提供了用C語言實現IoT設備的MQTT反應式客戶端的指南。
許多嵌入式系統是被動的,即它們對內部或外部事件做出反應。一旦這些反應完成,軟件就會返回等待下一個事件。這就是為什么事件驅動系統被稱為反應系統。
事件驅動編程,或稱反應式編程,是實現反應式系統靈活、可預測和可維護軟件的最合適的編程范式之一。在這個范例中,程序的流程是由事件決定的。通常,反應式軟件的結構是由多個并發單元(稱為活動對象)組成,這些單元等待并處理不同類型的事件。每個活動對象都擁有一個控制線程和一個事件隊列,通過它來處理傳入的事件。在反應式系統中,活動對象通常具有狀態圖中定義的基于狀態的行為。
為了探索如何在具有多個并發任務的反應式系統中使用MQTT反應庫,同時使用狀態機和事件驅動范式,我們以物聯網設備為例。
使用MQTT協議的想法是在一家鐵路公司開發物聯網設備時產生的。該裝置是一個清晰的反應系統,能夠:
檢測并存儲多個數字輸入的變化
采集、濾波和存儲多個模擬信號
定期將存儲的信息發送到遠程服務器
在GSM網絡上通過MQTT協議發送和接收信息
之所以選擇MQTT,是因為它是一種基于發布者和訂戶的輕量級消息傳遞協議,通常用于物聯網和網絡應用中,在這些應用中,預期會出現高延遲和低數據速率鏈路,例如GSM網絡。
上述物聯網設備的MQTT功能是通過使用LiamBindle的MQTT-C的修改版本實現的。由于該設備的軟件設計為反應式軟件,因此必須修改MQTT-C,以便通過交換異步事件與系統的其余部分進行通信。這些事件用于通過網絡接收和發送流量,以及連接敏感信息并將其發布到服務器。生成的軟件庫稱為MQTT Reactive。
狀態機
MQTT Reactive是通過一個狀態機使用的,如圖1所示,該狀態機為MQTT反應式客戶機的基本行為建模。它是一個名為MqttMgr(MQTT管理器)的活動對象。圖1中的狀態機操作演示了如何從狀態機使用MQTT反應庫。即使在圖1中使用C語言作為動作語言,也可以使用任何計算機或形式語言。
Figure 1. State machine of an MQTT-Reactive client
圖1中的狀態機以WaitingForNetConnection狀態啟動。當網絡連接建立到等待狀態后,服務器將接收到等待連接狀態。只有在這種狀態下,狀態機才能分別通過CONNECT和PUBLISH事件將MQTT消息發送到代理,例如CONNECT或PUBLISH。同步狀態使用UML的特殊機制來延遲發布事件,發布事件由同步狀態的內部分區中包含的defer關鍵字指定。如果發布事件在Sync為當前狀態時發生,則它將被保存(延遲)以供將來處理,直到SM進入發布事件不在其延遲事件列表(例如WaitingForSync或WaitingForNetConnection)中的狀態。進入這些狀態后,狀態機將自動調用任何保存的發布事件,然后根據轉換目標狀態使用或丟棄此事件。
每同步一毫秒,狀態機就轉換到Sync composite狀態,該狀態通過向網絡管理器發布接收和發送事件來實際發送和接收來自網絡的流量。它是一個處理網絡問題的并發實體。
盡管引入的MqttMgr只支持CONNECT和PUBLISH包,但它可以通過相當簡單的更改來支持SUBSCRIBE包。
狀態機操作使用params關鍵字訪問已消費事件的參數。例如,在以下轉換中,Connect事件攜帶兩個參數clientId和keepAlive,它們的值用于更新相應的MqttMgr對象的屬性:
Connect(clientId, keepAlive)/
me->clientId = params->clientId;
me->keepAlive = params->keepAlive;
me->operRes = mqtt_connect(&me->client, me->clientId, NULL, NULL, 0,
NULL, NULL, 0, me->keepAlive);
在本例中,Connect(clientId,keepAlive)事件是轉換的觸發器,mqtt_Connect()調用是作為結果執行的操作的一部分。換句話說,當MqttMgr對象接收到參數為“publishing_client”和“400”,Connect(“publishing_client”,400)的Connect(clientId,keepAlive)事件時,MqttMgr的clientId和keepAlive屬性會相應地更新為值“publishing\u client”和“400”。
為了創建和發送事件,狀態機的操作使用GEN()宏。例如,以下語句將接收事件發送到收集器對象,該對象被其收集器指針引用為MqttMgr對象的屬性:
GEN(me->itsCollector, Receive());
GEN()語句的第一個參數是接收事件的對象,而第二個參數是要發送的事件,包括事件參數(如果有)。參數必須與事件參數一致。例如,以下語句生成一個ConnRefused(code)事件,并將其發送到收集器對象,將代理返回的代碼作為事件參數傳遞:
GEN(me->itsCollector, ConRefused(code));
使用params關鍵字訪問所消費事件的參數和GEN()宏從操作中生成事件的想法是從Rational Rhapsody Developer’s code開發人員的代碼生成器中純粹出于說明目的而采用的。
圖1中狀態機的默認操作設置每當從代理接收到連接接受時MQTT Reactive調用的回調。此回調應在MqttMgr代碼中實現。此回調必須生成ConnAccepted or ConnRefused(code)事件,以便發送到收集器對象,如下所示。
static voidconnack_response_callback(enum MQTTConnackReturnCode return_code)
{ /…/ if (return_code == MQTT_CONNACK_ACCEPTED)
{ GEN(me->itsCollector, ConnAccepted()); }
else
{ GEN(me->itsCollector, ConnRefused(return_code)); }}
模型實施
圖1中的模型可以用C或C++實現,或者使用您最喜歡的軟件工具,或者只使用自己的狀態機實現。在因特網上有不同的工具可以實現這一點,例如RKH框架、QP框架、Yakindu Statechart工具或RationalRhapsody Developer等等。它們都支持狀態圖和C/C++語言。此外,其中一些還包括繪制狀態圖并從中生成代碼的工具。
此狀態機是從名為MqttMgr(MQTT管理器)的活動對象執行的,它提供了MQTT反應式代碼的嚴格封裝,并且它是唯一允許調用任何MQTT反應式函數或訪問MQTT反應式數據的實體。系統中的其他并發實體以及任何isr只能通過與MqttMgr交換事件來間接使用MQTT Reactive。使用這種機制來同步并發實體并在它們之間共享數據避免了傳統阻塞機制(如信號量、互斥體、延遲或事件標志)的危險。這些機制可能會導致難以診斷和修復的意外故障。
MqttMgr活動對象將其屬性封裝為一組數據項。數據項用名稱和類型指定變量,其中類型實際上是數據類型。MqttMgr對象的數據項映射到對象結構的成員。成員的名稱和類型與對象數據的名稱和類型相同。例如,MqttMgr對象類型的client屬性作為數據成員嵌入到MqttMgr結構中:
struct MqttMgr { /* … / struct mqtt_client client; / attribute client / LocalRecvAll localRecv; / attribute localRecv */};
直接訪問和修改MqttMgr對象的數據,而不使用訪問器或賦值器操作。例如,客戶機和localRecv通過me指針訪問,該指針指向MqttMgr的實例。
mqtt_recvMsgError(&me->client, &me->localRecv);
MqttMgr具有表1中顯示的屬性列表。
圖2中的結構有助于記住相關參與者之間的關系。它們是:Collector對象,它希望向代理發送信息;NetMgr對象,它處理網絡;以及MqttMgr對象。
Figure 2. Draft of IoT system structure
圖3中的序列圖顯示了當需要MqttMgr對象打開與MQTT服務器的會話時,它是如何與系統其余部分交互的。在此圖中,MqttMgr狀態和交換的異步消息顯示在收集器、MqttMgr和NetMgr參與者之間。
NetMgr對象建立到代理的網絡連接后,從MqttMgr發送到MQTT服務器的第一個數據包必須是CONNECT數據包。因此,收集器actor向MqttMgr
actor發送一個Connect(clientId,keepAlive)事件。此事件必須攜帶客戶端標識符和保持活動時間間隔。如果服務器接受連接請求,MqttMgr actor將向收集器actor發送ConnAccepted事件以通知此情況。從那時起,收集器參與者可以向該代理發布信息消息。
如果服務器拒絕連接請求,MqttMgr actor將向收集器actor發送一個conndrekend事件。此事件附帶一個代碼,用于通知拒絕原因,如圖4所示。
Figure 4. Broker rejects a connection request
圖5顯示了消息發布時的交互流。為了做到這一點,收集器參與者發送一個發布(data,size,topic,qos)事件,該事件攜帶要發布的信息(data)、信息的長度(以字節為單位)、信息將被發布到的主題名稱(topic)以及傳遞該消息的保證級別(qos)。在前面提到的IoT設備中,發布的信息是使用JSON規范格式化的。它是一種開放的標準格式,在人類可讀文本中包含具有屬性值對的數據對象。這種格式是使用jWrite實現的,jWrite是一個用C編寫的簡單而輕量級的庫。
Figure 5. Publishing data to a broker
圖6顯示了一個場景,其中MQTT消息的接收和發送失敗。如果網絡管理器無法從網絡接收流量,它將向MqttMgr actor發送ReceiveFail。類似地,如果網絡管理器不能向網絡發送數據,它將向MqttMgr actor發送SendFail。
Figure 6. Failures in network
表2總結了所示場景中涉及的事件。
結論
通過避免傳統阻塞機制(如信號量、互斥、延遲或事件標志)的危險,本文提出的MQTT反應庫、狀態機和軟件體系結構允許反應式嵌入式系統以新穎的方式實現MQTT客戶機。它是通過將MQTT反應式代碼封裝在稱為active object的并發單元中實現的,active object基于狀態的行為在建議的狀態機中定義。此活動對象通過交換異步事件與系統的其余部分進行通信:不僅用于在網絡上接收和發送流量,還用于將信息連接并發布到用于物聯網應用程序的服務器。
總結
以上是生活随笔為你收集整理的反应式系统实现MQTT客户机的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 为什么您应该使用基于标准的开发实践
- 下一篇: 从C到C++过渡的3个原因