常用 MQTT 客户端库简介
前言
MQTT 是一個輕量的發布訂閱模式消息傳輸協議,專門針對低帶寬和不穩定網絡環境的物聯網應用設計。MQTT 基于發布/訂閱范式,工作在 TCP/IP協議族上,MQTT 協議輕量、簡單、開放并易于實現,這些特點使它適用范圍非常廣泛。
MQTT 基于客戶端-服務器通信模式,MQTT 服務端稱為 MQTT Broker,目前行業內可選的 MQTT Broker 較多,其優劣與功能差別比較本文不再贅述。本文以開源社區中最流行的 MQTT 消息服務器 EMQ X 為例,使用 EMQ 提供的公共 Broker broker.emqx.io ,通過一個簡單客戶端連接 Broker 并發布、處理消息的例子,整理總結不同編程語言、平臺下 MQTT 客戶端庫的使用方式與樣例。
入選客戶端庫如下:
- Eclipse Paho C 與 Eclipse Paho Embedded C
- Eclipse Paho Java Client
- Eclipse Paho MQTT Go client
- emqtt : EMQ 提供的 Erlang MQTT 客戶端庫
- MQTT.js Web 端 & Node.js 平臺 MQTT 客戶端
- Eclipse Paho Python
MQTT 社區收錄了許多 MQTT 客戶端庫,讀者可以在此處查看。
樣例應用介紹
MQTT 客戶端整個生命周期的行為可以概括為:建立連接、訂閱主題、接收消息并處理、向指定主題發布消息、取消訂閱、斷開連接。
標準的客戶端庫在每個環節都暴露出相應的方法,不同庫在相同環節所需方法參數含義大致相同,具體選用哪些參數、啟用哪些功能特性需要用戶深入了解 MQTT 協議特性并結合實際應用場景而定。
本文以一個客戶端連接并發布、處理消息為例,給出每個環節大致需要使用的參數:
- 建立連接:
- 指定 MQTT Broker 基本信息接入地址與端口
- 指定傳輸類型是 TCP 還是 MQTT over WebSocket
- 如果啟用 TLS 需要選擇協議版本并攜帶相應的的證書
- Broker 啟用了認證鑒權則客戶端需要攜帶相應的 MQTT Username Password 信息
- 配置客戶端參數如 keepalive 時長、clean session 回話保留標志位、MQTT 協議版本、遺囑消息(LWT)等
- 訂閱主題:連接建立成功后可以訂閱主題,需要指定主題信息
- 指定主題過濾器 Topic,訂閱的時候支持主題通配符 + 與 # 的使用
- 指定 QoS,根據客戶端庫和 Broker 的實現可選 Qos 0 1 2,注意部分 Broker 與云服務提供商不支持部分 QoS 級別,如 AWS IoT 、阿里云 IoT 套件、Azure IoT Hub 均不支持 QoS 2 級別消息
- 訂閱主題可能因為網絡問題、Broker 端 ACL 規則限制而失敗
- 接收消息并處理:
- 一般是在連接時指定處理函數,依據客戶端庫與平臺的網絡編程模型不同此部分處理方式略有不同
- 發布消息:向指定主題發布消息
- 指定目標主題,注意該主題不能包含通配符 + 或 #,若主題中包含通配符可能會導致消息發布失敗、客戶端斷開等情況(視 Broker 與客戶端庫實現方式)
- 指定消息 QoS 級別,同樣存在不同 Broker 與平臺支持的 QoS 級別不同,如 Azure IoT Hub 發布 QoS 2 的消息將斷開客戶端連接
- 指定消息體內容,消息體內容大小不能超出 Broker 設置最大消息大小
- 指定消息 Retain 保留消息標志位
- 取消訂閱:
- 指定目標主題即可
- 斷開連接:
- 主動斷開連接,將發布遺囑消息(LWT)
Eclipse Paho C 與 Eclipse Paho Embedded C
Eclipse Paho C 與 Eclipse Paho Embedded C 均為 Eclipse Paho 項目下的客戶端庫,均為使用 ANSI C 編寫的功能齊全的 MQTT 客戶端,Eclipse Paho Embedded C 可以在桌面操作系統上使用,但主要針對 mbed,Arduino和 FreeRTOS 等嵌入式環境。
該客戶端有同步/異步兩種 API ,分別以 MQTTClient 和 MQTTAsync 開頭:
- 同步 API 旨在更簡單,更有用,某些調用將阻塞直到操作完成為止,使用編程上更加容易;
- 異步 API 中只有一個調用塊 API-waitForCompletion ,通過回調進行結果通知,更適用于非主線程的環境。
兩個庫的下載、使用詳細說明請移步至項目主頁查看,本文使用 Eclipse Paho C,直接提供樣例代碼如下:
#include "stdio.h" #include "stdlib.h" #include "string.h"#include "MQTTClient.h"#define ADDRESS "tcp://broker.emqx.io:1883" #define CLIENTID "emqx_test" #define TOPIC "testtopic/1" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000Lint main(int argc, char* argv[]) {MQTTClient client;MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;MQTTClient_message pubmsg = MQTTClient_message_initializer;MQTTClient_deliveryToken token;int rc;MQTTClient_create(&client, ADDRESS, CLIENTID,MQTTCLIENT_PERSISTENCE_NONE, NULL);// Connection parametersconn_opts.keepAliveInterval = 20;conn_opts.cleansession = 1;if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("Failed to connect, return code %d\n", rc);exit(-1);}// Publish messagepubmsg.payload = PAYLOAD;pubmsg.payloadlen = strlen(PAYLOAD);pubmsg.qos = QOS;pubmsg.retained = 0;MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);printf("Waiting for up to %d seconds for publication of %s\n""on topic %s for client with ClientID: %s\n",(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);printf("Message with delivery token %d delivered\n", token);// DisconnectMQTTClient_disconnect(client, 10000);MQTTClient_destroy(&client);return rc; }Eclipse Paho Java Client
Eclipse Paho Java Client 是用 Java 編寫的 MQTT 客戶端庫,可用于 JVM 或其他 Java 兼容平臺(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API。
通過 Maven 安裝:
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version> </dependency>連接樣例代碼如下:
App.java
package io.emqx;import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://broker.emqx.io:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// Connection optionsMqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// Retain connectionconnOpts.setCleanSession(true);// Set callbackclient.setCallback(new PushCallback());// Setup connectionSystem.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// Publishclient.subscribe(subTopic);// Required parameters for publishing messageMqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}} }回調消息處理類 OnMessageCallback.java
package io.emqx;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// Reconnect after lost connection.System.out.println("Connection lost, and re-connect here.");}public void messageArrived(String topic, MqttMessage message) throws Exception {// Message handler after receiving messageSystem.out.println("Topic:" + topic);System.out.println("QoS:" + message.getQos());System.out.println("Payload:" + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());} }Eclipse Paho MQTT Go client
Eclipse Paho MQTT Go Client 為 Eclipse Paho 項目下的 Go 語言版客戶端庫,該庫能夠連接到 MQTT Broker 以發布消息,訂閱主題并接收已發布的消息,支持完全異步的操作模式。
客戶端依賴于 Google 的 proxy 和 websockets 軟件包,通過以下命令完成安裝:
go get github.com/eclipse/paho.mqtt.golang連接樣例代碼如下:
package mainimport ("fmt""log""os""time""github.com/eclipse/paho.mqtt.golang" )var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("TOPIC: %s\n", msg.Topic())fmt.Printf("MSG: %s\n", msg.Payload()) }func main() {mqtt.DEBUG = log.New(os.Stdout, "", 0)mqtt.ERROR = log.New(os.Stdout, "", 0)opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")opts.SetKeepAlive(60 * time.Second)// Message callback handleropts.SetDefaultPublishHandler(f)opts.SetPingTimeout(1 * time.Second)c := mqtt.NewClient(opts)if token := c.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}// Subscriptionif token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// Publish messagetoken := c.Publish("testtopic/1", 0, false, "Hello World")token.Wait()time.Sleep(6 * time.Second)// Cancel subscriptionif token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// Disconnectc.Disconnect(250)time.Sleep(1 * time.Second) }emqtt : EMQ 提供的 Erlang MQTT 客戶端庫
emqtt 是開源 MQTT Broker EMQ X 官方 EMQ 提供的客戶端庫,適用于 Erlang 語言。
Erlang 生態有多個 MQTT Broker 實現,如通過插件支持 MQTT 的 RabbitMQ ,VerenMQ、EMQ X 等。但是 MQTT 客戶端庫幾乎沒有選擇的余地,MQTT 社區收錄的 Erlang 客戶端庫中 emqtt 是最佳選擇。
emqtt 完全由 Erlang 實現,完成支持 MQTT v3.1.1 和 MQTT v5.0 協議版本,支持 SSL 單雙向認證與 WebSocket 連接。另一款 MQTT 基準測試工具 emqtt_bench 就基于該客戶端庫構建。
emqtt 使用方式如下:
ClientId = <<"test">>. {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]). {ok, _Props} = emqtt:connect(ConnPid). Topic = <<"guide/#">>. QoS = 1. {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}). {ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1">>, <<"Hello World!">>, QoS). %% If the qos of publish packet is 0, `publish` function would not return packetid. ok = emqtt:publish(ConnPid, <<"guide/2">>, <<"Hello World!">>, 0).%% Recursively get messages from mail box. Y = fun (Proc) -> ((fun (F) -> F(F) end)((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end. Rec = fun(Receive) -> fun()-> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end. (Y(Rec))().%% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell. Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end. Receive().{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>).ok = emqtt:disconnect(ConnPid).MQTT.js Web 端 & Node.js 平臺 MQTT 客戶端
MQTT.js 是 JavaScript 編寫的,實現了 MQTT 協議客戶端功能的模塊,可以在 Node.js 或瀏覽器環境中使用。在 Node.js 中使用時,即可以 -g 全局安裝以命令行的形式使用,又可以將其集成到項目中調用。
由于 JavaScript 單線程特性,MQTT.js 是全異步 MQTT 客戶端,MQTT.js 支持 MQTT 與 MQTT over WebSocket,在不同運行環境支持程度如下:
- 瀏覽器環境:MQTT over WebSocket(包括微信小程序、支付寶小程序等定制瀏覽器環境)
- Node.js 環境:MQTT、MQTT over WebSocket
不同環境里除了少部分連接參數不同,其他 API 均是相同的。
使用 npm 安裝:
npm i mqtt使用 CDN 安裝(瀏覽器):
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script> <script>// Initialize a global mqtt variableconsole.log(mqtt) </script>樣例代碼:
// const mqtt = require('mqtt') import mqtt from 'mqtt'// Connection option const options = {clean: true, // Retain connectionconnectTimeout: 4000, // Timeout// AuthticationclientId: 'emqx_test',username: 'emqx_test',password: 'emqx_test', }// Connection string // ws: unsecured WebSocket // wss: secured WebSocket connection // mqtt: unsecured TCP connection // mqtts: secured TCP connection const connectUrl = 'wss://broker.emqx.io:8084/mqtt' const client = mqtt.connect(connectUrl, options)client.on('reconnect', (error) => {console.log('reconnect:', error) })client.on('reconnect', (error) => {console.log('reconnect:', error) })client.on('message', (topic, message) => {console.log('message:', topic, message.toString()) })Eclipse Paho Python
Eclipse Paho Python 為 Eclipse Paho 項目下的 Python 語言版客戶端庫,該庫能夠連接到 MQTT Broker 以發布消息,訂閱主題并接收已發布的消息。
使用 PyPi 包管理工具安裝:
pip install paho-mqtt代碼樣例:
import paho.mqtt.client as mqtt# Successful Connection Callback def on_connect(client, userdata, flags, rc):print('Connected with result code '+str(rc))client.subscribe('testtopic/#')# Message delivery callback def on_message(client, userdata, msg):print(msg.topic+" "+str(msg.payload))client = mqtt.Client()# Set callback handler client.on_connect = on_connect client.on_message = on_message# Set up connection client.connect('broker.emqx.io', 1883, 60) # Publish message client.publish('emqtt',payload='Hello World',qos=0)client.loop_forever()總結
關于 MQTT 協議、MQTT 客戶端庫使用流程、常用 MQTT 客戶端的簡介就到這里,歡迎讀者通過 EMQ X 進行MQTT 學習、項目開發使用。
總結
以上是生活随笔為你收集整理的常用 MQTT 客户端库简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cd JAVA系统找不到指定路径_SDK
- 下一篇: BAT批处理整人代码