Python paho-mqtt消息队列
官方文檔
https://docs.emqx.io/broker/v3/cn/
安裝
pip install paho-mqtt
?
服務(wù)接收測試:
mosquitto_sub -h 192.168.203.13 -u honey -P honey -t secevent/# -v mosquitto_sub -h 192.168.203.13 -u honey -P honey -t hserver/# -v客戶端代碼:
#!/usr/bin/env python # --*-- coding:UTF-8 --*--import time import sys import paho.mqtt.client as mqttmq_send = "secevent/P0001/events" mq_send_heartbeat = "hserver/P0001/status"def mq_init():def on_connect(client, userdata, flags, rc):logger.debug("mqtt connect:" + str(rc))if rc == 0:logger.info("Mq connect success.")else:logger.error("Mq connect faild.")def on_message(client, userdata, msg):logger.debug(msg.tocic + " " + str(msg.payload))try:global_par.mq = mqtt.Client()global_par.mq.loop_start()global_par.mq.on_connect = on_connectglobal_par.mq.on_message = on_messageglobal_par.mq.username_pw_set("honey", "honey")global_par.mq.connect("192.168.203.13", 1883)mq.publish(mq_send, "aaaaa", 2)mq.publish(mq_send_heartbeat, "bbbbb", 2, retain=True) #保留最后一條記錄except Exception as e:logger.error("Mq init error:{0}".format(e))?
一、Client模塊
與MQTT代理(broker)進(jìn)行通信的主要類。
(一)使用流程
- 使用connect()/connect_async()?連接MQTT代理
- 頻繁的調(diào)用loop()來維持與MQTT代理之間的流量?
- 或者使用loop_start()來設(shè)置一個(gè)線程為你調(diào)用loop()
- 或者在一個(gè)阻塞的函數(shù)中調(diào)用loop_forever()來為你調(diào)用loop()
- 使用subscribe()訂閱一個(gè)主題(topic)并接受消息(messages)
- 使用publish()來發(fā)送消息
- 使用disconnect()來斷開與MQTT代理的連接
(二)回調(diào)(Callbacks)
1.基本概念
使用回調(diào)處理從MQTT代理返回的數(shù)據(jù),要使用回調(diào)需要先定義回調(diào)函數(shù)然后將其指派給客戶端實(shí)例(client)。?
例如:
所有的回調(diào)函數(shù)都有client和userdata參數(shù)。?
client是調(diào)用回調(diào)的客戶端實(shí)例;?
userdata可以使任何類型的用戶數(shù)據(jù),可以在創(chuàng)建新客戶端實(shí)例時(shí)設(shè)置或者使用user_data_set(userdata)設(shè)置。
2.回調(diào)種類
(1)on_connect()
當(dāng)代理響應(yīng)連接請求時(shí)調(diào)用。?
on_connect(client, userdata, flags, rc):?
flags是一個(gè)包含代理回復(fù)的標(biāo)志的字典;?
rc的值決定了連接成功或者不成功:
| 0 | 連接成功 |
| 1 | 協(xié)議版本錯(cuò)誤 |
| 2 | 無效的客戶端標(biāo)識(shí) |
| 3 | 服務(wù)器無法使用 |
| 4 | 錯(cuò)誤的用戶名或密碼 |
| 5 | 未經(jīng)授權(quán) |
(2)on_disconnect()
當(dāng)與代理斷開連接時(shí)調(diào)用
on_disconnect(client, userdata, rc):rc參數(shù)表示斷開狀態(tài)。?
如果MQTT_ERR_SUCCESS(0),回調(diào)被調(diào)用以響應(yīng)disconnect()調(diào)用。 如果以任何其他值斷開連接是意外的,例如可能出現(xiàn)網(wǎng)絡(luò)錯(cuò)誤。
(3)on_message()
on_message(client, userdata, message):當(dāng)收到關(guān)于客戶訂閱的主題的消息時(shí)調(diào)用。?
message是一個(gè)描述所有消息參數(shù)的MQTTMessage。
(4)on_publish()
當(dāng)使用使用publish()發(fā)送的消息已經(jīng)傳輸?shù)酱頃r(shí)被調(diào)用。
對于Qos級別為1和2的消息,這意味著已經(jīng)完成了與代理的握手。?
對于Qos級別為0的消息,這只意味著消息離開了客戶端。?
mid變量與從相應(yīng)的publish()返回的mid變量匹配,以允許跟蹤傳出的消息。
此回調(diào)很重要,因?yàn)榧词筽ublish()調(diào)用返回,但并不總意味著消息已發(fā)送。
(5)on_subscribe()
當(dāng)代理響應(yīng)訂閱請求時(shí)被調(diào)用。
on_subscribe(client, userdata, mid, granted_qos):mid變量匹配從相應(yīng)的subscri?
be()返回的mid變量。?
‘granted_qos’變量是一個(gè)整數(shù)列表,它提供了代理為每個(gè)不同的訂閱請求授予的QoS級別。
(6)on_unsubscribe()
當(dāng)代理響應(yīng)取消訂閱請求時(shí)調(diào)用。
on_unsubscribe(client, userdata, mid):mid匹配從相應(yīng)的unsubscribe()調(diào)用返回的中間變量。
(7)on_log()
當(dāng)客戶端有日志信息時(shí)調(diào)用
on_log(client, userdata, level, buf):level變量給出了消息的嚴(yán)重性,并且將是MQTT_LOG_INFO,MQTT_LOG_NOTICE,MQTT_LOG_WARNING,MQTT_LOG_ERR和MQTT_LOG_DEBUG中的一個(gè)。?
buf變量用于存儲(chǔ)信息。
(三)方法
1.構(gòu)造函數(shù)Client()
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")| client_id | 連接到代理時(shí)使用的唯一客戶端ID字符串。 如果client_id長度為零或無,則會(huì)隨機(jī)生成一個(gè)。 在這種情況下,clean_session參數(shù)必須為True。 |
| clean_session | 一個(gè)決定客戶端類型的布爾值。 如果為True,那么代理將在其斷開連接時(shí)刪除有關(guān)此客戶端的所有信息。 如果為False,則客戶端是持久客戶端,當(dāng)客戶端斷開連接時(shí),訂閱信息和排隊(duì)消息將被保留。 |
| userdata | 用戶定義的任何類型的數(shù)據(jù)作為userdata參數(shù)傳遞給回調(diào)函數(shù)。 它可能會(huì)在稍后使用user_data_set()函數(shù)進(jìn)行更新。 |
| protocol | 用于此客戶端的MQTT協(xié)議的版本。 可以是MQTTv31或MQTTv311。 |
| transport | 設(shè)置為“websockets”通過WebSockets發(fā)送MQTT。 保留默認(rèn)的“tcp”使用原始TCP。 |
示例:
import paho.mqtt.client as mqtt client = mqtt.Client()2.reinitialise()
reinitialise(client_id="", clean_session=True, userdata=None)reinitialise()函數(shù)將客戶端重置為其開始狀態(tài),就像它剛剛創(chuàng)建一樣。 它采用與Client()構(gòu)造函數(shù)相同的參數(shù)。?
示例:
3.選項(xiàng)函數(shù)
這些函數(shù)表示可以在客戶端上設(shè)置以修改其行為的選項(xiàng)。 在大多數(shù)情況下,這必須在連接到代理之前完成。
(1)max_inflight_messages_set()
max_inflight_messages_set(self, inflight)設(shè)置QoS> 0的消息的最大數(shù)量,該消息一次可以部分通過其網(wǎng)絡(luò)流量。默認(rèn)為20.增加此值將消耗更多內(nèi)存,但可以增加吞吐量。
(2)max_queued_messages_set()
max_queued_messages_set(self, queue_size)設(shè)置傳出消息隊(duì)列中可等待處理的具有QoS> 0的傳出消息的最大數(shù)量。默認(rèn)為0表示無限制。 當(dāng)隊(duì)列已滿時(shí),任何其他傳出的消息都將被丟棄。
(3)message_retry_set()
message_retry_set(retry)如果代理沒有響應(yīng),設(shè)置在重發(fā)QoS> 0的消息之前以秒為單位的時(shí)間。默認(rèn)設(shè)置為5秒,通常不需要更改。
(4)ws_set_options()
ws_set_options(self, path="/mqtt", headers=None)設(shè)置websocket連接選項(xiàng)。 只有在transport =“websockets”被傳入Client()構(gòu)造函數(shù)時(shí)才會(huì)使用這些選項(xiàng)。
| path | 代理使用的mqtt路徑 |
| headers | 可以是一個(gè)字典,指定應(yīng)該附加到標(biāo)準(zhǔn)websocket頭部的額外頭部列表,也可以是可調(diào)用的正常websocket頭部并返回帶有一組頭部以連接到代理的新字典。 |
必須在調(diào)用connect()之前調(diào)用。
(4)tls_set()
tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,tls_version=ssl.PROTOCOL_TLS, ciphers=None)配置網(wǎng)絡(luò)加密和身份驗(yàn)證選項(xiàng)。 啟用SSL / TLS支持。
| ca_certs | 證書頒發(fā)機(jī)構(gòu)證書文件的字符串路徑,該證書文件將被視為受此客戶端信任。 |
| certfile, keyfile | 分別指向PEM編碼的客戶端證書和私鑰的字符串。 如果這些參數(shù)不是None,那么它們將用作基于TLS的身份驗(yàn)證的客戶端信息。 對此功能的支持取決于代理。 |
| cert_reqs | 定義客戶對經(jīng)紀(jì)人施加的證書要求。 默認(rèn)情況下,這是ssl.CERT_REQUIRED,這意味著代理必須提供證書。 有關(guān)此參數(shù)的更多信息,請參閱ssl pydoc。 |
| tls_version | 指定要使用的SSL / TLS協(xié)議的版本。 默認(rèn)情況下(如果python版本支持它),檢測到最高的TLS版本。 |
| ciphers | 指定哪些加密密碼可供此連接使用的字符串,或者使用None來使用默認(rèn)值。 有關(guān)更多信息,請參閱ssl pydoc。 |
必須在調(diào)用connect()之前調(diào)用。
(5)tls_set_context()
配置網(wǎng)絡(luò)加密和認(rèn)證上下文。 啟用SSL / TLS支持。
tls_set_context(context=None)| context | 一個(gè)ssl.SSLContext對象。 |
必須在調(diào)用connect()之前調(diào)用。
(6)tls_insecure_set()
配置服務(wù)器證書中服務(wù)器主機(jī)名的驗(yàn)證。
tls_insecure_set(value)如果value設(shè)置為True,則不可能保證您連接的主機(jī)不模擬您的服務(wù)器。 這在初始服務(wù)器測試中可能很有用,但是,惡意的第三方通過可以DNS欺騙模擬您的服務(wù)器。
- 請勿在真實(shí)系統(tǒng)中使用此功能。 將值設(shè)置為True意味著使用加密沒有意義。
- 必須在connect*()之前和tls_set()或tls_set_context()之后調(diào)用。
(7)enable_logger()
使用標(biāo)準(zhǔn)的Python日志包啟用日志記錄。 這可以與on_log回調(diào)方法同時(shí)使用
enable_logger(logger=None)如果指定了記錄器,那么將使用該logging.Logger對象,否則將自動(dòng)創(chuàng)建一個(gè)。?
按照以下映射將Paho日志記錄級別轉(zhuǎn)換為標(biāo)準(zhǔn)日志級別:
| MQTT_LOG_ERR | ligging.ERROR |
| MQTT_LOG_WARNING | logging.WARNING |
| MQTT_LOG_NOTICE | logging.INFO (no direct equivalent) |
| MQTT_LOG_INFO | logging.INFO |
| MQTT_LOG_DEBUG | logging.DEBUG |
(8)disable_logger()
使用標(biāo)準(zhǔn)python日志包禁用日志記錄。 這對on_log回調(diào)沒有影響。
disable_logger()(9)username_pw_set()
為代理認(rèn)證設(shè)置一個(gè)用戶名和一個(gè)可選的密碼。必須在connect*()之前調(diào)用。
username_pw_set(username, password=None)(10)user_data_set()
設(shè)置在生成事件時(shí)將傳遞給回調(diào)的私人用戶數(shù)據(jù)。 將其用于您自己的目的以支持您的應(yīng)用程序。
user_data_set(userdata)(11)will_set()
設(shè)置要發(fā)送給代理的遺囑。 如果客戶端斷開而沒有調(diào)用disconnect(),代理將代表它發(fā)布消息。
will_set(topic, payload=None, qos=0, retain=False)| topic | 該遺囑消息發(fā)布的主題 |
| payload | 該消息將作為遺囑發(fā)送 |
| qos | 用于遺囑的服務(wù)質(zhì)量等級 |
| retain | 如果設(shè)置為True,遺囑消息將被設(shè)置為該主題的“最后已知良好”/保留消息。 |
如果qos不是0,1或2,或者主題為None或字符串長度為零,則引發(fā)ValueError。
(11)reconnect_delay_set()
客戶端將自動(dòng)重試連接。 在每次嘗試之間,它會(huì)在min_delay和max_delay之間等待幾秒鐘。
reconnect_delay_set(min_delay=1, max_delay=120)當(dāng)連接丟失時(shí),最初重新連接嘗試延遲min_delay秒。 延遲在隨后的嘗試到中增加一倍。當(dāng)連接完成時(shí)(例如收到CONNACK,而不僅僅是TCP連接建立),延遲重置為min_delay。
4.connect()
connect()函數(shù)將客戶端連接到代理。 這是一個(gè)阻塞函數(shù)。
connect(host, port=1883, keepalive=60, bind_address="")| host | 遠(yuǎn)程代理的主機(jī)名或IP地址 |
| port | 要連接的服務(wù)器主機(jī)的網(wǎng)絡(luò)端口。 默認(rèn)為1883 |
| keepalive | 與代理通信之間允許的最長時(shí)間段(以秒為單位)。 如果沒有其他消息正在交換,則它將控制客戶端向代理發(fā)送ping消息的速率 |
| bind_address | 假設(shè)存在多個(gè)接口,將綁定此客戶端的本地網(wǎng)絡(luò)接口的IP地址 |
5.connect_async()
與loop_start()一起使用以非阻塞方式連接。 直到調(diào)用loop_start()之前,連接才會(huì)完成。
connect_async(host, port=1883, keepalive=60, bind_address="")6.connect_srv()
使用SRV DNS查找連接到代理以獲取代理地址。
connect_srv(domain, keepalive=60, bind_address="")| domain | 該DNS域搜索SRV記錄。 如果無,請嘗試確定本地域名。 |
7.reconnect()
使用先前提供的詳細(xì)信息重新連接到經(jīng)紀(jì)商。 在調(diào)用此函數(shù)之前,您必須先調(diào)用connect *()。
reconnect()8.disconnect()
干凈地從代理斷開連接。 使用disconnect()不會(huì)導(dǎo)致代理發(fā)送遺囑消息。
disconnect()9.loop()
定期調(diào)用處理網(wǎng)絡(luò)事件。
loop(timeout=1.0, max_packets=1)此調(diào)用在select()中等待,直到網(wǎng)絡(luò)套接字可用于讀取或?qū)懭?#xff08;如果適用),然后處理傳入/傳出數(shù)據(jù)。
| timeout | 此方法最多可阻塞timeout秒 |
| max_packets | max_packets參數(shù)已過時(shí),應(yīng)保留為未設(shè)置狀態(tài)。 |
示例:
run = True while run:client.loop()10.loop_start() / loop_stop()
這些功能實(shí)現(xiàn)了到網(wǎng)絡(luò)循環(huán)的線程接口。
loop_start() loop_stop(force=False)在connect*()之前或之后調(diào)用loop_start()一次,會(huì)在后臺(tái)運(yùn)行一個(gè)線程來自動(dòng)調(diào)用loop()。這釋放了可能阻塞的其他工作的主線程。這個(gè)調(diào)用也處理重新連接到代理。?
調(diào)用loop_stop()來停止后臺(tái)線程。
force參數(shù)目前被忽略。?
示例:
11.loop_forever()
這是網(wǎng)絡(luò)循環(huán)的阻塞形式,直到客戶端調(diào)用disconnect()時(shí)才會(huì)返回。它會(huì)自動(dòng)處理重新連接。
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)除了使用connect_async時(shí)的第一次連接嘗試以外,請使用retry_first_connection = True使其重試第一個(gè)連接。這可能會(huì)導(dǎo)致客戶端連接到一個(gè)不存在的主機(jī)的情況。
12.publish()
從客戶端發(fā)送消息給代理。
publish(topic, payload=None, qos=0, retain=False)消息將會(huì)發(fā)送給代理,并隨后從代理發(fā)送到訂閱匹配主題的任何客戶端。
| topic | 該消息發(fā)布的主題 |
| payload | 要發(fā)送的實(shí)際消息。如果沒有給出,或設(shè)置為無,則將使用零長度消息。 傳遞int或float將導(dǎo)致有效負(fù)載轉(zhuǎn)換為表示該數(shù)字的字符串。 如果你想發(fā)送一個(gè)真正的int / float,使用struct.pack()來創(chuàng)建你需要的負(fù)載 |
| qos | 服務(wù)的質(zhì)量級別 |
| retain | 如果設(shè)置為True,則該消息將被設(shè)置為該主題的“最后已知良好”/保留的消息 |
返回以下屬性和方法的MQTTMessageInfo:?
rc:發(fā)布的結(jié)果。
| MQTT_ERR_SUCCESS | 成功 |
| MQTT_ERR_NO_CONN | 客戶端當(dāng)前未連接 |
| MQTT_ERR_QUEUE_SIZE | 當(dāng)使用max_queued_messages_set來指示消息既不排隊(duì)也不發(fā)送。 |
mid:發(fā)布請求的消息ID。?
如果mid已定義,則可以通過檢查on_publish()回調(diào)中的mid來跟蹤發(fā)布請求。?
wait_for_publish():函數(shù)將阻塞,直到消息發(fā)布。 如果消息未排隊(duì)(rc == MQTT_ERR_QUEUE_SIZE),它將引發(fā)ValueError。?
is_published:如果消息已發(fā)布,is_published返回True。 如果消息未排隊(duì)(rc == MQTT_ERR_QUEUE_SIZE),它將引發(fā)ValueError。
如果主題為無,長度為零或無效(包含通配符),qos不是0,1或2之一,或者有效負(fù)載長度大于268435455字節(jié),則會(huì)引發(fā)ValueError。
13.subscribe()
subscribe(topic, qos=0)訂閱一個(gè)或多個(gè)主題。?
這個(gè)函數(shù)可以用三種不同的方式調(diào)用:
(1)簡單的字符串和整數(shù)
subscribe("my/topic", 2)| topic | 一個(gè)字符串,指定要訂閱的訂閱主題 |
| qos | 期望的服務(wù)質(zhì)量等級。 默認(rèn)為0。 |
(2)字符串和整數(shù)元組
subscribe(("my/topic", 1))| topic | (topic,qos)的元組。 主題和qos都必須存在于元組中。 |
| qos | 沒有使用 |
(3)字符串和整數(shù)元組的列表
這允許在單個(gè)SUBSCRIPTION命令中使用多個(gè)主題訂閱,這比使用多個(gè)訂閱subscribe()更有效。
subscribe([("my/topic", 0), ("another/topic", 2)])| topic | 格式元組列表(topic,qos)。 topic和qos都必須出現(xiàn)在所有的元組中。 |
| qos | 沒有使用 |
該函數(shù)返回一個(gè)元組(result,mid)。
如果qos不是0,1或2,或者主題為None或字符串長度為零,或者topic不是字符串,元組或列表,則引發(fā)ValueError。
14.unsubscribe()
取消訂閱一個(gè)或多個(gè)主題。
unsubscribe(topic)| topic | 主題的單個(gè)字符串或者字符串列表 |
返回一個(gè)元組(result, mid)
15.外部事件循環(huán)支持
(1)loop_read()
loop_read(max_packets=1)當(dāng)套接字準(zhǔn)備好讀取時(shí)調(diào)用。 max_packets已過時(shí),應(yīng)保持未設(shè)置狀態(tài)。
(2)loop_write()
loop_write(max_packets=1)當(dāng)套接字準(zhǔn)備好寫入時(shí)調(diào)用。 max_packets已過時(shí),應(yīng)保持未設(shè)置狀態(tài)。
(3)loop_misc()
loop_misc()每隔幾秒呼叫一次以處理消息重試和ping。
(4)socket()
socket()返回客戶端中使用的套接字對象,以允許與其他事件循環(huán)進(jìn)行交互。
(5)want_write()
want_write()如果有數(shù)據(jù)等待寫入,則返回true,以允許將客戶端與其他事件循環(huán)連接。
16.全局輔助函數(shù)
client模塊還提供了一些全局幫助函數(shù)。?
(1)topic_matches_sub(sub,topic)可用于檢查主題是否與預(yù)訂匹配。?
(2)connack_string(connack_code)返回與CONNACK結(jié)果關(guān)聯(lián)的錯(cuò)誤字符串。?
(3)error_string(mqtt_errno)返回與Paho MQTT錯(cuò)誤號關(guān)聯(lián)的錯(cuò)誤字符串。
?
二、Publish模塊
該模塊提供了一些幫助功能,可以以一次性方式直接發(fā)布消息。換句話說,它們對于您想要發(fā)布給代理的單個(gè)/多個(gè)消息然后斷開與其他任何必需的連接的情況非常有用。
(一)方法
1.Single
將一條消息發(fā)布給代理,然后徹底斷開連接。
single(topic, payload=None, qos=0, retain=False, hostname="localhost",port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,protocol=mqtt.MQTTv311, transport="tcp")| topic | 唯一必需的參數(shù)必須是負(fù)載將發(fā)布到的主題字符串。 |
| payload | 要發(fā)布的有效載荷。 如果“”或None,零長度的有效載荷將被發(fā)布 |
| qos | 發(fā)布時(shí)使用的qos默認(rèn)為0 |
| retain | 設(shè)置消息保留(True)或不(False) |
| hostname | 一個(gè)包含要連接的代理地址的字符串。 默認(rèn)為localhost |
| port | 要連接到代理的端口。 默認(rèn)為1883 |
| client_id | 要使用的MQTT客戶端ID。 如果“”或None,Paho庫會(huì)自動(dòng)生成客戶端ID |
| keepalive | 客戶端的存活超時(shí)值。 默認(rèn)為60秒 |
| will | 一個(gè)包含客戶端遺囑參數(shù)的字典,will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}. |
| auth | 一個(gè)包含客戶端驗(yàn)證參數(shù)的字典,auth = {‘username’:”<username>”, ‘password’:”<password>”} |
| tls | 一個(gè)包含客戶端的TLS配置參數(shù)的字典,dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>} |
| protocol | 選擇要使用的MQTT協(xié)議的版本。 使用MQTTv31或MQTTv311。 |
| transport | 設(shè)置為“websockets”通過WebSockets發(fā)送MQTT。 保留默認(rèn)的“tcp”使用原始TCP。 |
示例:
import paho.mqtt.publish as publishpublish.single("paho/test/single", "payload", hostname="iot.eclipse.org")2.Multiple
將多條消息發(fā)布給代理,然后干凈地?cái)嚅_連接。
multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")| msgs | 要發(fā)布的消息列表。 每條消息是一個(gè)字典或一個(gè)元組。msg = {‘topic’:”<topic>”, ‘payload’:”<payload>”, ‘qos’:<qos>, ‘retain’:<retain>}或(“<topic>”, “<payload>”, qos, retain) |
有關(guān)hostname,port,client_id,keepalive,will,auth,tls,protocol,transport的描述,請參閱single()。?
示例:
三、Subscribe模塊
該模塊提供了一些幫助功能,以允許直接訂閱和處理消息。
(1)方法
1.Simple
訂閱一組主題并返回收到的消息。 這是一個(gè)阻塞函數(shù)。
simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost",port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,protocol=mqtt.MQTTv311)| topics | 唯一需要的參數(shù)是客戶端將訂閱的主題字符串。 如果需要訂閱多個(gè)主題,這可以是字符串或字符串列表 |
| qos | 訂閱時(shí)使用的qos默認(rèn)為0 |
| msg_count | 從代理檢索的消息數(shù)量。 默認(rèn)為1.如果為1,則返回一個(gè)MQTTMessage對象。 如果> 1,則返回MQTTMessages列表 |
| retained | 設(shè)置為True以考慮保留的消息,將其設(shè)置為False以忽略具有保留標(biāo)志設(shè)置的消息 |
| hostname | 一個(gè)包含要連接的代理地址的字符串。 默認(rèn)為localhost |
| port | 要連接到代理的端口。 默認(rèn)為1883 |
| client_id | 要使用的MQTT客戶端ID。 如果“”或None,Paho庫會(huì)自動(dòng)生成客戶端ID |
| keepalive | 客戶端的存活超時(shí)值。 默認(rèn)為60秒。 |
| will | 一個(gè)包含客戶端遺囑參數(shù)的字典,will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}. |
| auth | 一個(gè)包含客戶端驗(yàn)證參數(shù)的字典,auth = {‘username’:”<username>”, ‘password’:”<password>”} |
| tls | 一個(gè)包含客戶端的TLS配置參數(shù)的字典,dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>} |
| protocol | 選擇要使用的MQTT協(xié)議的版本。 使用MQTTv31或MQTTv311。 |
2.Callback
訂閱一組主題并使用用戶提供的回叫處理收到的消息。
callback(callback, topics, qos=0, userdata=None, hostname="localhost",port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,protocol=mqtt.MQTTv311)| callback | 一個(gè)“on_message”回調(diào)將被用于每個(gè)收到的消息 |
| topics | 客戶端將訂閱的主題字符串。 如果需要訂閱多個(gè)主題,這可以是字符串或字符串列表。 |
| qos | 訂閱時(shí)使用的qos默認(rèn)為0 |
| userdata | 用戶提供的對象將在收到消息時(shí)傳遞給on_message回調(diào)函數(shù) |
有關(guān)hostname,port,client_id,keepalive,will,auth,tls,protocol的描述,請參閱simple()。?
示例:
參考資料:https://pypi.python.org/pypi/paho-mqtt
總結(jié)
以上是生活随笔為你收集整理的Python paho-mqtt消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 利用Nmap对MS-SQLSERVER进
- 下一篇: 挖洞经验:通过Vimeo的文件上传功能发