mqtt异步publish方法
Python基于mqtt異步編程主要用到asyncio及第三方庫hbmqtt,這里主要介紹mqtt的異步發(fā)布及遇到的一些問題。
hbmqtt安裝很簡單,pip hbmqtt install.
mqtt服務(wù)器我使用的是mosquitto.
1、主進(jìn)程
主進(jìn)程執(zhí)行下面語句就實(shí)現(xiàn)了協(xié)程
1 loop = asyncio.get_event_loop() 2 loop.run_until_complete(run())首先是connect,然后publish,整個過程是一個協(xié)程
1 run(): #協(xié)程主函數(shù) 2 await connect() 3 while True: 4 try: 5 await publish() 6 except Exception as ce: 7 logger.error("Sender Error: %s" % ce)2、Connect
這里connect沒有使用自動重連機(jī)制connect(),而是單獨(dú)開一個協(xié)程執(zhí)行掉線后自動重連_auto_reconnect
重連函數(shù),每隔1s執(zhí)行一次:
1 async def _auto_reconnect(client): 2 while True: 3 if not client.session.transitions.is_connected(): #如果已經(jīng)連接上則不執(zhí)行重連 4 try: 5 await client.reconnect() 6 except ConnectException: 7 pass 8 await asyncio.sleep(1)連接函數(shù),僅執(zhí)行一次,并啟動重連函數(shù)協(xié)程
1 async def connect(): 2 client = MQTTClient(config={"auto_reconnect": False}) #False時關(guān)閉自動重連 3 try: 4 await client.connect(url) 5 except ConnectException: 6 pass 7 asyncio.ensure_future(_auto_reconnect(client))3、Publish:
發(fā)布函數(shù)比較簡單,主要就是
1 await client.publish(topic, msg)4、總結(jié)
這里重點(diǎn)講為什么協(xié)程時不要自動重連,因為如果publish過程中出現(xiàn)斷線,需要等待連接成功的event,如果允許自動重連"auto_reconnect": True,程序在publish程序等待信號不退出,無法進(jìn)入connect程序執(zhí)行reconnect,這樣就永遠(yuǎn)等不到信號,造成程序死等,類似死機(jī)。
如果不允許自動重連,單開一個協(xié)程執(zhí)行重連操作,即使publish協(xié)程等待事件,重連協(xié)程會使這個事件響應(yīng),這樣就可以繼續(xù)發(fā)布。
在hbmqtt庫自帶例子中多是先connect,然后publish,然后disconnect,以此循環(huán),但主要考慮到連接后不主動斷開一提高程序效率,故沒有斷開操作。
這個問題的解決方式可能有點(diǎn)牽強(qiáng),園友們有沒有遇到過類似的問題,在publish過程中關(guān)閉mqtt服務(wù)器,再重新打開服務(wù)器,發(fā)布任務(wù)能夠繼續(xù)正常執(zhí)行,如果有好的解決方式,還望不吝賜教。
?
轉(zhuǎn)載于:https://www.cnblogs.com/xiating/p/8203615.html
總結(jié)
以上是生活随笔為你收集整理的mqtt异步publish方法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 三、python+selenium
- 下一篇: jsp 记录1 bs/cs