python异步编程之asyncio低阶API
低階API介紹
asyncio中低階API的種類很多,涉及到開發的5個方面。包括:
- 獲取事件循環
- 事件循環方法集
- 傳輸
- 協議
- 事件循環策略
本篇中只講解asyncio常見常用的函數,很多底層函數如網絡、IPC、套接字、信號等不在本篇范圍。
獲取事件循環
事件循環是異步中重要的概念之一,用于驅動任務的執行。包含的低階API如下:
| 函數 | 功能 |
|---|---|
| asyncio.get_running_loop() | 獲取當前運行的事件循環首選函數。 |
| asyncio.get_event_loop() | 獲得一個事件循環實例 |
| asyncio.set_event_loop() | 將策略設置到事件循環 |
| asyncio.new_event_loop() | 創建一個新的事件循環 |
在asyncio初識這篇中提到過事件循環,可以把事件循環當做是一個while循環,在周期性的運行并執行一些任務。這個說法比較抽象,事件循環本質上其實是能調用操作系統IO模型的模塊。以Linux系統為例,IO模型有阻塞,非阻塞,IO多路復用等。asyncio 常用的是IO多路復用模型的epool和 kqueue。事件循環原理涉及到異步編程的操作系統原理,后續更新一系列相關文章。
get_event_loop()
創建一個事件循環,用于驅動協程的執行
import asyncio
async def demo(i):
print(f"hello {i}")
def main():
loop = asyncio.get_event_loop()
print(loop._selector)
task = loop.create_task(demo(1))
loop.run_until_complete(task)
main()
結果:
<selectors.KqueueSelector object at 0x104eabe20>
hello 1
可以通過loop._selector屬性獲取到當前事件循環使用的是kqueue模型
獲取循環
import asyncio
async def demo(i):
res = asyncio.get_running_loop()
print(res)
print(f"hello {i}")
def main():
loop = asyncio.get_event_loop()
task = loop.create_task(demo(1))
loop.run_until_complete(task)
main()
結果:
<_UnixSelectorEventLoop running=True closed=False debug=False>
hello 1
推薦使用asyncio.run 創建事件循環,底層API主要用于庫的編寫。
生命周期
生命周期是用于管理任務的啟停的函數,如下:
| 函數 | 功能 |
|---|---|
| loop.run_until_complete() | 運行一個期程/任務/可等待對象直到完成。 |
| loop.run_forever() | 一直運行事件循環,直到被顯示停止 |
| loop.stop() | 停止事件循環 |
| loop.close() | 關閉事件循環 |
| loop.is_running() | 返回 True , 如果事件循環正在運行 |
| loop.is_closed() | 返回 True ,如果事件循環已經被關閉 |
| await loop.shutdown_asyncgens() | 關閉異步生成器 |
run_until_complete:
運行一個期程/任務/可等待對象直到完成。run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task。run_until_complete()是會自動關閉事件循環的函數,區別于run_forever()是需要手動關閉事件循環的函數。
import asyncio
async def demo(i):
print(f"hello {i}")
def main():
loop = asyncio.get_event_loop()
task = loop.create_task(demo(1))
# 傳入的是一個任務
loop.run_until_complete(task)
# 傳入的是一個協程也可以
loop.run_until_complete(demo(20))
main()
結果:
hello 1
hello 20
調試
| 函數 | 功能 |
|---|---|
| loop.set_debug() | 開啟或禁用調試模式 |
| loop.get_debug() | 獲取當前測試模式 |
調度回調函數
在異步編程中回調函數是一種很常見的方法,想要在事件循環中增加一些回調函數,可以有如下方法:
| 函數 | 功能 |
|---|---|
| loop.call_soon() | 盡快調用回調。 |
| loop.call_soon_threadsafe() | loop.call_soon() 方法線程安全的變體。 |
| loop.call_later() | 在給定時間之后調用回調函數。 |
| loop.call_at() | 在指定的時間調用回調函數。 |
這些回調函數既可以回調普通函數也可以回調協程函數。call_soon
函數原型:
loop.call_soon(callback, *args, context=None)
示例:
import asyncio
async def my_coroutine():
print("協程被執行")
async def other_coro():
print("非call_soon調用")
def callback_function():
print("回調函數被執行")
# 創建一個事件循環
loop = asyncio.get_event_loop()
# 使用create_task包裝協程函數,并調度執行
loop.call_soon(loop.create_task, my_coroutine())
# 調度一個常規函數以盡快執行
loop.call_soon(callback_function)
# 啟動一個事件循環
task = loop.create_task(other_coro())
loop.run_until_complete(task)
結果:
回調函數被執行
非call_soon調用
協程被執行
結果分析:
call_soon調用普通函數直接傳入函數名作為參數,調用協程函數需要講協程通過loop.create_task封裝成task。
線程/進程池
| 函數 | 功能 |
|---|---|
| await loop.run_in_executor() | 多線程中運行一個阻塞的函數 |
| loop.set_default_executor() | 設置 loop.run_in_executor() 默認執行器 |
asyncio.run_in_executor 用于在異步事件循環中執行一個阻塞的函數或方法。它將阻塞的調用委托給一個線程池或進程池,以確保不阻塞主事件循環。可以用于在協程中調用一些不支持異步編程的方法,不支持異步編程的模塊。
run_in_executor
import asyncio
import concurrent.futures
def blocking_function():
# 模擬一個阻塞的操作
import time
time.sleep(2)
return "阻塞函數返回"
async def async_function2():
print("async_function2 start")
await asyncio.sleep(1)
print("async_function2 end")
async def async_function():
print("異步函數開始執行。。。")
print("調用同步阻塞函數")
# 使用run_in_executor調度執行阻塞函數
result = await loop.run_in_executor(None, blocking_function)
print(f"獲取同步函數的結果: {result}")
# 創建一個事件循環
loop = asyncio.get_event_loop()
# 運行異步函數
loop.run_until_complete(asyncio.gather(async_function(), async_function2()))
結果:
異步函數開始執行。。。
調用同步阻塞函數
async_function2 start
async_function2 end
獲取同步函數的結果: 阻塞函數返回
結果分析:
通過事件循環執行任務async_function,在async_function中通過loop.run_in_executor調用同步阻塞函數blocking_function,該阻塞函數沒有影響事件循環中另一個任務async_function2的執行。await loop.run_in_executor(None, blocking_function)中None代表使用的是默認線程池,也可以替換成其他線程池。
使用自定義線程池和進程池
import asyncio
import concurrent.futures
def blocking_function():
# 模擬一個阻塞的操作
import time
time.sleep(2)
return "阻塞函數返回"
async def async_function():
print("異步函數開始執行。。。")
print("調用同步阻塞函數")
# 線程池
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_function)
print('線程池調用返回結果:', result)
# 進程池
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_function)
print('進程池調用返回結果:', result)
if __name__ == '__main__':
# 創建一個事件循環
loop = asyncio.get_event_loop()
# 運行異步函數
loop.run_until_complete(async_function())
結果:
異步函數開始執行。。。
調用同步阻塞函數
線程池調用返回結果: 阻塞函數返回
進程池調用返回結果: 阻塞函數返回
結果分析:
通過線程池concurrent.futures.ThreadPoolExecutor()和進程池concurrent.futures.ProcessPoolExecutor()執行阻塞函數。
任務與期程
| 函數 | 功能 |
|---|---|
| loop.create_future() | 創建一個 Future 對象。 |
| loop.create_task() | 將協程當作 Task 一樣調度。 |
| loop.set_task_factory() | 設置 loop.create_task() 使用的工廠,它將用來創建 Tasks 。 |
| loop.get_task_factory() | 獲取 loop.create_task() 使用的工廠,它用來創建 Tasks 。 |
create_future
create_future 的功能是創建一個future對象。future對象通常不需要手動創建,因為task會自動管理任務結果。相當于task是全自動,創建future是半自動。創建的future就需要手動的講future狀態設置成完成,才能表示task的狀態為完成。
import asyncio
def foo(future, result):
print(f"此時future的狀態:{future}")
future.set_result(result)
print(f"此時future的狀態:{future}")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 手動創建future對象
all_done = loop.create_future()
# 設置一個回調函數用于修改設置future的結果
loop.call_soon(foo, all_done, "Future is done!")
result = loop.run_until_complete(all_done)
print("返回結果", result)
print("獲取future的結果", all_done.result())
結果:
此時future的狀態:<Future pending cb=[_run_until_complete_cb() at /Users/lib/python3.10/asyncio/base_events.py:184]>
此時future的狀態:<Future finished result='Future is done!'>
返回結果 Future is done!
獲取future的結果 Future is done!
結果分析:
future設置結果之后之后,future對象的狀態就從pending變成finished狀態。如果一個future沒有手動設置結果,那么事件循環就不會停止。
create_task
將協程封裝成一個task對象,事件循環主要操作的是task對象。協程沒有狀態,而task是有狀態的。
import asyncio
async def demo(i):
print(f"hello {i}")
await asyncio.sleep(1)
def main():
loop = asyncio.get_event_loop()
# 將攜程封裝成task,給事件使用
task = loop.create_task(demo(1))
loop.run_until_complete(task)
main()
>>>
hello 1
asyncio.create_task 和 loop.create_task的區別:
兩者實現的功能都是一樣的,將協程封裝成一個task,讓協程擁有了生命周期。區別僅僅在于使用的方法。asyncio.create_task 是高階API,不需要創建事件循環,而loop.create_task需要先創建事件循環再使用該方法。
小結
以上是asyncio低階API的使用介紹,前一篇是高階API的使用介紹,用兩篇介紹了asyncio常見的函數,以后遇到asyncio相關的代碼就不會感到陌生。雖然asyncio是比較復雜的編程思想,但是有了這些函數的使用基礎,能夠更高效的掌握。
連載一系列關于python異步編程的文章。包括同異步框架性能對比、異步事情驅動原理等。歡迎關注微信公眾號第一時間接收推送的文章。
總結
以上是生活随笔為你收集整理的python异步编程之asyncio低阶API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Feign源码解析:初始化过程(三)
- 下一篇: kafka源码阅读之MacBook Pr