使用 Python MQTT 客户端 Paho-MQTT 的初学者指南
Paho-MQTT是由Eclipse基金會開發的開源Python MQTT客戶端。Paho-MQTT可以在任何支持Python的設備上運行。在本教程中,我們將使用 Paho 構建一個 MQTT 客戶端。我將把庫的每個功能添加到客戶端程序中,并解釋它是如何工作的。在本教程結束時,您將對庫的工作原理有一個基本的了解。
如果您不熟悉 MQTT,最好先學習我的上一篇《MQTT基礎知識及工作原理》
0. 安裝 Python MQTT 客戶端 Paho-MQTT
Paho MQTT 需要 Python 版本 3.4+。要進行安裝,請打開主機或終端,然后輸入:
pip install paho-mqtt1. 建立與 MQTT 代理的連接
本教程將使用 Eclipse 提供的公共 MQTT 代理。
警告: 不要在生產環境中使用公有 MQTT 代理。
要在PC上運行代理,您可以安裝mosquitto:
在Windows和Ubuntu/Debian上安裝Mosquitto
建立與 MQTT 代理的連接
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port)這是怎么回事?
1883是 MQTT 中所有未加密連接的缺省端口號。
我們創建一個 MQTT 客戶端對象并調用它。我們將在下一節中看到有關 paho 客戶端對象的更多信息。client
接下來,我們使用代理的地址和端口號調用函數。connect()
如果連接成功,該函數將返回 0。connect()
讓我們分解一下客戶端對象:
1.1 客戶端對象
該對象創建 MQTT 客戶機。它需要4個可選參數:client()
client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")client_id是客戶端在連接到代理時給出的唯一字符串。如果您未提供客戶端 ID,代理將向客戶端分配一個客戶端 ID。
每個客戶端必須具有唯一的客戶端 ID。代理使用客戶機 ID 來唯一標識每個用戶。如果使用相同的客戶端 ID 連接第二個客戶端,則第一個客戶端將斷開連接。
clean_session是默認設置為True的布爾值。
如果設置為False,則代理存儲有關客戶端的信息。
如果設置為True,代理將刪除有關客戶端的所有存儲信息。
要了解干凈會話,請參閱: MQTT 中的干凈會話說明
userdata是可以作為參數發送到回調的數據。有關回調的更多信息,請參見第 4 節。
protocol可以是其中之一,也可以取決于您要使用的版本。MQTTv31MQTTv311
transport缺省值為 。如果要通過WebSockets發送消息,請設置為 。tcpwebsockets
為了干凈地斷開與代理的連接,我們可以使用函數。disconnect()
2. 發布消息
要發布消息,我們使用該函數。該函數采用 4 個參數:publish()
publish(topic, payload=None, qos=0, retain=False)topic是包含主題名稱的字符串。
主題名稱區分大小寫。
payload是一個字符串,其中包含將發布到主題的消息。訂閱該主題的任何客戶端都將看到有效負載消息。
這些是可選參數:
qos為 0、1 或 2。它默認為 0。服務質量是保證收到消息的級別。
要了解不同的 MQTT 服務級別質量,請參閱此帖子
: MQTT QoS 級別說明
retain是默認為False的布爾值。如果設置為 True,則它告訴代理將該主題上的該消息存儲為"最后一條好消息"。
要了解 MQTT 中的消息保留,請參閱以下內容:MQTT 保留的消息
將發布函數添加到我們的代碼中:
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.publish(topic="TestingTopic", payload="TestingPayload", qos=0, retain=False)要檢查消息是否已成功發布到某個主題,我們需要一個訂閱該主題的客戶端。## 3. 訂閱主題要訂閱主題,我們需要該函數。該函數采用 2 個參數subscribe()```powershell subscribe("topicName", qos=0)topic是包含主題名稱的字符串。
注意:主題名稱區分大小寫。
qos為 0、1 或 2。它將降級為 0。
讓我們修改我們的程序以訂閱要發布到的主題:
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) 注意:如果您希望客戶端訂閱多個主題,則可以將它們放在元組列表中。示例:client.subscribe([('topicName1', 1),('topicName2', 1)])元組的格式為 [(主題名稱,QoS 級別)]當您執行此程序時,您會注意到已發布的消息仍未顯示在控制臺/終端上。
訂閱某個主題會告訴代理向您發送發布到該主題的消息。我們已經訂閱了該主題,但我們需要一個回調函數來處理這些消息。
回調是在事件發生時執行的函數。在 paho 中,這些事件包括連接、斷開連接、訂閱、取消訂閱、發布、接收消息、日志記錄。
在對程序實現回調之前,我們需要首先了解程序如何調用這些回調。為此,我們將使用 paho 中可用的不同循環函數。
4.1 循環函數
循環是客戶端對象的函數。當客戶端收到消息時,該消息將存儲在接收緩沖區中。當要將消息從客戶端發送到代理時,該消息將存儲在發送緩沖區中。循環函數用于處理緩沖區中的任何消息并調用相應的回調函數。它們還將嘗試在斷開連接時重新連接到代理。
大多數循環函數都是異步運行的,這意味著當調用循環函數時,它將在單獨的線程上運行。
有 3 種類型的循環函數:
4.1.1 loop_forever():這是一個阻塞循環函數。這意味著循環函數將繼續運行,并且在調用此函數后無法執行任何其他行。如果要無限期地運行程序,并且程序中有單個客戶端構造函數,請使用此功能。
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_forever()4.1.2 loop_start()/loop_stop(): 這是一個非阻塞循環函數,這意味著你可以調用這個函數,并在函數調用后繼續執行代碼。顧名思義,loop_start() 用于啟動循環函數,loop_stop() 用于停止循環函數。如果需要在同一程序中創建多個客戶端對象,則可以使用 loop_start()。
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_start() #Some Executable Code Here client.loop_stop() #Client Loop Stops After loop_stop() is called4.1.3 loop():這是一個阻塞循環函數。loop() 和 loop_forever() 之間的區別在于,如果調用 loop() 函數,則必須手動處理重新連接,這與后者不同。loop_forever() & loop_start() 函數將在斷開連接時自動嘗試重新連接到代理。除非在特殊情況下,否則不建議使用 loop()。
現在回到回調。在下一節中,我們將在程序中實現每個回調。
4.2 on_connect
每次客戶端連接/重新連接到代理時都會調用回調。讓我們將回調添加到我們的程序中。on_connect()
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 def on_connect(client, userdata, flags, rc):print("Connected With Result Code: {}".format(rc)) def on_disconnect(client, userdata, rc):print("Client Got Disconnected") client = mqtt.Client() client.on_connect = on_connect client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_forever()必須將回調分配給客戶端對象。如果不是,則回調將不會執行
這是怎么回事?
我們創建on_connect回調函數。它需要4個參數:客戶端對象,用戶數據,標志,rc。
對象。client
userdata是在客戶端構造函數中聲明的自定義數據。如果要將自定義數據傳遞到回調中,則需要它。
flags是一個字典對象,用于檢查是否已將客戶端對象的 clean 會話設置為 True 或 False。
rc這是result code,用于檢查連接狀態。不同的結果代碼是:
0:連接成功 1:連接被拒絕 – 協議不正確 版本 2:連接被拒絕 – 客戶端標識符無效 3:連接被拒絕 – 服務器不可用 4:連接被拒絕 – 用戶名或密碼錯誤 5:連接被拒絕 – 未授權 6-255:當前未使用。
該程序的輸出是:
Connected With Result Code 0
這意味著連接成功。
4.3 on_disconnect
當客戶端與代理斷開連接時,將調用回調。on_disconnect()
不要忘記將回調函數分配給客戶端對象!
client.on_disconnect = on_disconnect
運行該程序,斷開互聯網連接,然后查看是否打印了消息"客戶端已斷開連接"。此外,重新連接到互聯網以查看客戶端是否重新連接到代理。
可以將多個客戶端對象附加到單個回調函數。當您的程序連接到多個代理時,這很有用。
4.4 on_message
回到盡管訂閱了主題但仍未顯示消息的問題:回調用于處理發布到已訂閱主題的消息。on_message()
在我們的程序中,我們需要做3件事:1.
訂閱我們要發布的主題。
2. 使用回調處理已發布的消息。在我們的程序中,我們將簡單地打印消息。
3. 將回調函數分配給客戶端對象的屬性。on_message
讓我們創建另一個 mqtt 客戶端。此新程序將向已訂閱現有程序的主題發布消息。
sub.py(接收客戶端)
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 def on_connect(client, userdata, flags, rc):print("Connected With Result Code "+rc) def on_message(client, userdata, message):print("Message Recieved: "+message.payload.decode()) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False) client.loop_forever()pub.py(發布客戶端)
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 client = mqtt.Client() client.connect(broker_url, broker_port) client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)on_message() 回調有 3 個參數:、 & 。我已經解釋了本節中的前兩個。
該對象有 4 個屬性:、 、 、 。clientuserdatamessageon_connect()messagetopicpayloadqosretain
如果您注意到對象的每個屬性也是我們在客戶端函數中使用的參數。messagepublish()
先運行,然后運行 。輸出將顯示消息和主題。sub.pypub.py
如果郵件未打印出來,請檢查是否:
您已將該函數添加到客戶端對象的屬性中。on_message()
如果您已訂閱 了 中的回調中的主題。on_connect()sub.py
如果已分別在 和 函數中正確輸入主題名稱。subscribe()publish()sub.pypub.py
如果要在函數中打印消息。on_message()
如何組織和處理消息?
如果您從不同的主題收到多條消息,并且需要以不同的方式處理每個主題的消息,那么我們必須對消息進行排序。
執行此操作的一種方法是使用該屬性檢查將消息發布到哪個主題。然后,您可以創建 if 條件來相應地處理消息。message.topic
更好的方法是使用對象的功能。這將創建多個回調來處理來自不同主題的消息。
參數采用主題名稱,參數采用將處理該主題消息的回調的名稱。message_callback_add(sub, callback)clientsubcallback
例:
sub.py
import paho.mqtt.client as mqtt broker_url = "mqtt.eclipse.org" broker_port = 1883 def on_connect(client, userdata, flags, rc):print("Connected With Result Code " (rc)) def on_message_from_kitchen(client, userdata, message):print("Message Recieved from Kitchen: "+message.payload.decode()) def on_message_from_bedroom(client, userdata, message):print("Message Recieved from Bedroom: "+message.payload.decode()) def on_message(client, userdata, message):print("Message Recieved from Others: "+message.payload.decode()) client = mqtt.Client() client.on_connect = on_connect #To Process Every Other Message client.on_message = on_message client.connect(broker_url, broker_port) client.subscribe("TestingTopic", qos=1) client.subscribe("KitchenTopic", qos=1) client.subscribe("BedroomTopic", qos=1) client.message_callback_add("KitchenTopic", on_message_from_kitchen) client.message_callback_add("BedroomTopic", on_message_from_bedroom) client.loop_forever()在向其添加回調之前,請確保您已訂閱這兩個主題。
默認情況下,來自其他主題的消息將由 on_message() 處理
您可以通過發布到"KitchenTopic"和"臥室主題"并查看它是否單獨處理來嘗試上述程序。
4.5 on_publish
執行函數時調用該函數。這將返回一個元組 。on_publish()publish()(result, mid)
result是錯誤代碼。錯誤代碼 0 表示消息已成功發布。
mid代表消息 ID。它是一個整數,是客戶端分配的唯一消息標識符。如果使用 QoS 級別 1 或 2,則客戶端循環將使用 標識尚未發送的消息。mid
4.6 on_subscribe()/on_unsubscribe()
當客戶端訂閱主題時,將調用回調。例:on_subscribe()on_subscribe(client, userdata, mid, granted_qos)
當客戶端取消訂閱某個主題時,將調用回調。例:on_unsubscribe()on_unsubscribe(client, userdata, mid)
mid是函數中討論的消息 id。on_publish()
granted_qos是訂閱時該主題的 qos 級別。如果需要,您可以在程序中嘗試一下。
5. 其他有用的 Paho-MQTT 功能
Paho還有一些有用的函數,可以在程序中使用它們來執行函數,而不必創建客戶端構造函數并經歷創建回調的麻煩。
5.1 單() / 多() 發布
單個或多個函數用于將單個消息或多條消息發布到代理上的主題,而無需創建客戶機對象。
single(topic, payload=None, qos=0, retain=False, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")topic是唯一必需的參數。這是一個包含主題名稱的字符串。
auth是包含用戶名和密碼的字典(如果代理需要)。(eclipse 代理不需要身份驗證)。
例:auth = {‘username’:“username”, ‘password’:""}
tls如果我們使用 TLS/SSL 加密,則需要。
例:dict = {‘ca_certs’:"", ‘certfile’:"", ‘keyfile’:"", ‘tls_version’:"", ‘ciphers’:"<ciphers">}
transport接受 2 個值:和 。如果您不想使用websockets,請將其設置為 。websocketstcptcp
其他參數類似于客戶端對象中的參數。
multiple(msgs, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")msgs是必需的參數。它包含要發布的消息的列表。每條消息都可以是字典或元組。
字典必須采用以下格式:
msg = {‘topic’:"< topicname >", ‘payload’:"", ‘qos’:‘0’, ‘retain’:‘False’}
元組必須采用以下格式:
("< topicname >", “< payload >”, qos, retain)
在這兩種格式中,名稱都必須存在。topic
5.2 簡單() / 回調()
simple 是一個阻止函數,它訂閱一個主題或一個主題列表,并返回發布到該主題的消息。
simple(topics, qos=0, msg_count=1, retained=False, hostname=“localhost”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)
topics是唯一必需的參數。這可以是單個主題的字符串,也可以是多個主題的列表。
msg_count是在斷開與代理的連接之前必須返回的消息數。對于 msg_count > 1,該函數將返回消息列表。
其他參數已經在 single() / multiple() 部分中進行了解釋。
callback與 相似,唯一的區別是它需要一個額外的參數,即回調。簡單函數僅返回來自主題的消息,回調函數將返回的消息發送到任何函數進行處理。simple
callback(callback, topics, qos=0, userdata=None, hostname=“iot.eclipse.org”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)
6. Will_set()
這是一個非常有用的功能。當客戶端連接到代理時,它會告訴代理,如果它斷開連接,它必須向主題發布消息。它是客戶端構造函數的一個屬性。
client.will_set(topic, payload=None, qos=0, retain=False)
要了解如何以及何時在MQTT中使用Last Will & Testament,請參閱這篇文章
:MQTT Last Will and Testament(用示例解釋)
您的反饋對我來說非常重要。如果本教程對您有所幫助,或者,如果文章中的某些部分有錯誤或解釋不正確,請在下面的評論中告訴我。
總結
以上是生活随笔為你收集整理的使用 Python MQTT 客户端 Paho-MQTT 的初学者指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 论文参考文献格式标准~收藏
- 下一篇: 计算机类参考文献 期刊,期刊参考文献标准