运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践
我們一直都相信這樣一種說法:協程是比多線程更高效的一種并發工作方式,它完全由程序本身所控制,也就是在用戶態執行,協程避免了像線程切換那樣產生的上下文切換,在性能方面得到了很大的提升。毫無疑問,這是顛撲不破的業界共識,是放之四海而皆準的真理。
但事實上,協程遠比大多數人想象中的復雜,正因為協程的“用戶態”特性,任務調度權掌握在撰寫協程任務的人手里,而僅僅依賴async和await關鍵字遠遠達不到“調度”的級別,有時候反而會拖累任務效率,使其在任務執行效率上還不及“系統態”的多線程和多進程,本次我們來探討一下Python3原生協程任務的調度管理。
Python3.10協程庫async.io的基本操作
事件循環(Eventloop)是 原生協程庫asyncio 的核心,可以理解為總指揮。Eventloop實例提供了注冊、取消和執行任務和回調的方法。
Eventloop可以將一些異步方法綁定到事件循環上,事件循環會循環執行這些方法,但是和多線程一樣,同時只能執行一個方法,因為協程也是單線程執行。當執行到某個方法時,如果它遇到了阻塞,事件循環會暫停它的執行去執行其他的方法,與此同時為這個方法注冊一個回調事件,當某個方法從阻塞中恢復,下次輪詢到它的時候將會繼續執行,亦或者,當沒有輪詢到它,它提前從阻塞中恢復,也可以通過回調事件進行切換,如此往復,這就是事件循環的簡單邏輯。
而上面最核心的動作就是切換別的方法,怎么切換?用await關鍵字:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') async def job2(): print('job2開始') async def main(): await job1() await job2() if __name__ == '__main__': asyncio.run(main())系統返回:
job1開始 job1結束 job2開始是的,切則切了,可切的對嗎?事實上這兩個協程任務并沒有達成“協作”,因為它們是同步執行的,所以并不是在方法內await了,就可以達成協程的工作方式,我們需要并發啟動這兩個協程任務:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') async def job2(): print('job2開始') async def main(): #await job1() #await job2() await asyncio.gather(job1(), job2()) if __name__ == '__main__': asyncio.run(main())系統返回:
job1開始 job2開始 job1結束如果沒有asyncio.gather的參與,協程方法就是普通的同步方法,就算用async聲明了異步也無濟于事。而asyncio.gather的基礎功能就是將協程任務并發執行,從而達成“協作”。
但事實上,Python3.10也支持“同步寫法”的協程方法:
async def create_task(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) await task1 await task2這里我們通過asyncio.create_task對job1和job2進行封裝,返回的對象再通過await進行調用,由此兩個單獨的異步方法就都被綁定到同一個Eventloop了,這樣雖然寫法上同步,但其實是異步執行:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') async def job2(): print('job2開始') async def create_task(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) await task1 await task2 async def main(): #await job1() #await job2() await asyncio.gather(job1(), job2()) if __name__ == '__main__': asyncio.run(create_task())系統返回:
job1開始 job2開始 job1結束協程任務的上下游監控
解決了并發執行的問題,現在假設每個異步任務都會返回一個操作結果:
async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果"通過asyncio.gather方法,我們可以收集到任務執行結果:
async def main(): res = await asyncio.gather(job1(), job2()) print(res)并發執行任務:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): res = await asyncio.gather(job1(), job2()) print(res) if __name__ == '__main__': asyncio.run(main())系統返回:
job1開始 job2開始 job1結束 ['job1', 'job2']但任務結果僅僅也就是方法的返回值,除此之外,并沒有其他有價值的信息,對協程任務的執行明細諱莫如深。
現在我們換成asyncio.wait方法:
async def main(): res = await asyncio.wait([job1(), job2()]) print(res)依然并發執行:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): res = await asyncio.wait([job1(), job2()]) print(res) if __name__ == '__main__': asyncio.run(main())系統返回:
job1開始 job2開始 job1結束 ({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任務結果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任務結果'>}, set())可以看出,asyncio.wait返回的是任務對象,里面存儲了大部分的任務信息,包括執行狀態。
在默認情況下,asyncio.wait會等待全部任務完成 (return_when=‘ALL_COMPLETED’),它還支持 return_when=‘FIRST_COMPLETED’(第一個協程完成就返回)和 return_when=‘FIRST_EXCEPTION’(出現第一個異常就返回)。
這就非常令人興奮了,因為如果異步消費任務是發短信之類的需要統計達到率的任務,利用asyncio.wait特性,我們就可以第一時間記錄任務完成或者異常的具體時間。
協程任務守護
假設由于某種原因,我們手動終止任務消費:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) task1.cancel() res = await asyncio.gather(task1, task2) print(res) if __name__ == '__main__': asyncio.run(main())系統報錯:
File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main res = await asyncio.gather(task1, task2) asyncio.exceptions.CancelledError這里job1被手動取消,但會影響job2的執行,這違背了協程“互相提攜”的特性。
事實上,asyncio.gather方法可以捕獲協程任務的異常:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): task1 = asyncio.create_task(job1()) task2 = asyncio.create_task(job2()) task1.cancel() res = await asyncio.gather(task1, task2,return_exceptions=True) print(res) if __name__ == '__main__': asyncio.run(main())系統返回:
job2開始 [CancelledError(''), 'job2任務結果']可以看到job1沒有被執行,并且異常替代了任務結果作為返回值。
但如果協程任務啟動之后,需要保證任務情況下都不會被取消,此時可以使用asyncio.shield方法守護協程任務:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" async def main(): task1 = asyncio.shield(job1()) task2 = asyncio.create_task(job2()) res = await asyncio.gather(task1, task2,return_exceptions=True) task1.cancel() print(res) if __name__ == '__main__': asyncio.run(main())系統返回:
job1開始 job2開始 job1結束 ['job1任務結果', 'job2任務結果']協程任務回調
假設協程任務執行完畢之后,需要立刻進行回調操作,比如將任務結果推送到其他接口服務上:
import asyncio async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" def callback(future): print(f'回調任務: {future.result()}') async def main(): task1 = asyncio.shield(job1()) task2 = asyncio.create_task(job2()) task1.add_done_callback(callback) res = await asyncio.gather(task1, task2,return_exceptions=True) print(res) if __name__ == '__main__': asyncio.run(main())這里我們通過add_done_callback方法對job1指定了callback方法,當任務執行完以后,callback會被調用,系統返回:
job1開始 job2開始 job1結束 回調任務: job1任務結果 ['job1任務結果', 'job2任務結果']與此同時,add_done_callback方法不僅可以獲取協程任務返回值,它自己也支持參數參數傳遞:
import asyncio from functools import partial async def job1(): print('job1開始') await asyncio.sleep(1) print('job1結束') return "job1任務結果" async def job2(): print('job2開始') return "job2任務結果" def callback(future,num): print(f"回調參數{num}") print(f'回調任務: {future.result()}') async def main(): task1 = asyncio.shield(job1()) task2 = asyncio.create_task(job2()) task1.add_done_callback(partial(callback,num=1)) res = await asyncio.gather(task1, task2,return_exceptions=True) print(res) if __name__ == '__main__': asyncio.run(main())系統返回:
job1開始 job2開始 job1結束 回調參數1 回調任務: job1任務結果 ['job1任務結果', 'job2任務結果']結語
成也用戶態,敗也用戶態。所謂水能載舟亦能覆舟,協程消費任務的調度遠比多線程的系統級調度要復雜,稍不留神就會造成業務上的“同步”阻塞,弄巧成拙,適得其反。這也解釋了為什么相似場景中多線程的出場率要遠遠高于協程,就是因為多線程不需要考慮啟動后的“切換”問題,無為而為,簡單粗暴。
總結
以上是生活随笔為你收集整理的运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 程序员的十大级别--看看你是哪个级别
- 下一篇: 3D打印塑料钢网全流程介绍(文件输出、P