python asyncio理解_深入理解asyncio(二)
Asyncio.gather vs asyncio.wait
在上篇文章已經看到多次用asyncio.gather了,還有另外一個用法是asyncio.wait,他們都可以讓多個協程并發執行。那為什么提供2個方法呢?他們有什么區別,適用場景是怎么樣的呢?其實我之前也是有點困惑,直到我讀了asyncio的源碼。我們先看2個協程的例子:
async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
return 'A'
async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')
return 'B'
在IPython里面用gather執行一下:
In : return_value_a, return_value_b = await asyncio.gather(a(), b())
Suspending a
Suspending b
Resuming b
Resuming a
In : return_value_a, return_value_b
Out: ('A', 'B')
Ok,asyncio.gather方法的名字說明了它的用途,gather的意思是「搜集」,也就是能夠收集協程的結果,而且要注意,它會按輸入協程的順序保存的對應協程的執行結果。
接著我們說asyncio.await,先執行一下:
In : done, pending = await asyncio.wait([a(), b()])
Suspending b
Suspending a
Resuming b
Resuming a
In : done
Out:
{:1> result='A'>,
:8> result='B'>}
In : pending
Out: set()
In : task = list(done)[0]
In : task
Out: :8> result='B'>
In : task.result()
Out: 'B'
asyncio.wait的返回值有2項,第一項表示完成的任務列表(done),第二項表示等待(Future)完成的任務列表(pending),每個任務都是一個Task實例,由于這2個任務都已經完成,所以可以執行task.result()獲得協程返回值。
Ok, 說到這里,我總結下它倆的區別的第一層區別:asyncio.gather封裝的Task全程黑盒,只告訴你協程結果。
asyncio.wait會返回封裝的Task(包含已完成和掛起的任務),如果你關注協程執行結果你需要從對應Task實例里面用result方法自己拿。
為什么說「第一層區別」,asyncio.wait看名字可以理解為「等待」,所以返回值的第二項是pending列表,但是看上面的例子,pending是空集合,那么在什么情況下,pending里面不為空呢?這就是第二層區別:asyncio.wait支持選擇返回的時機。
asyncio.wait支持一個接收參數return_when,在默認情況下,asyncio.wait會等待全部任務完成(return_when='ALL_COMPLETED'),它還支持FIRST_COMPLETED(第一個協程完成就返回)和FIRST_EXCEPTION(出現第一個異常就返回):
In : done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED)
Suspending a
Suspending b
Resuming b
In : done
Out: {:8> result='B'>}
In : pending
Out: {:3> wait_for=()]>>}
看到了吧,這次只有協程b完成了,協程a還是pending狀態。
在大部分情況下,用asyncio.gather是足夠的,如果你有特殊需求,可以選擇asyncio.wait,舉2個例子:需要拿到封裝好的Task,以便取消或者添加成功回調等
業務上需要FIRST_COMPLETED/FIRST_EXCEPTION即返回的
asyncio.create_task vs loop.create_task vs asyncio.ensure_future
創建一個Task一共有3種方法,如這小節的標題。在上篇文章我說過,從Python 3.7開始可以統一的使用更高階的asyncio.create_task。其實asyncio.create_task就是用的loop.create_task:
def create_task(coro):
loop = events.get_running_loop()
return loop.create_task(coro)
loop.create_task接受的參數需要是一個協程,但是asyncio.ensure_future除了接受協程,還可以是Future對象或者awaitable對象:如果參數是協程,其實底層還是用的loop.create_task,返回Task對象
如果是Future對象會直接返回
如果是一個awaitable對象會await這個對象的__await__方法,再執行一次ensure_future,最后返回Task或者Future
所以就像ensure_future名字說的,確保這個是一個Future對象:Task是Future 子類,前面說過一般情況下開發者不需要自己創建Future
其實前面說的asyncio.wait和asyncio.gather里面都用了asyncio.ensure_future。對于絕大多數場景要并發執行的是協程,所以直接用asyncio.create_task就足夠了~
shield
接著說asyncio.shield,用它可以屏蔽取消操作。一直到這里,我們還沒有見識過Task的取消。看一個例子:
In : loop = asyncio.get_event_loop()
In : task1 = loop.create_task(a())
In : task2 = loop.create_task(b())
In : task1.cancel()
Out: True
In : await asyncio.gather(task1, task2)
Suspending a
Suspending b
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
cell_name in async-def-wrapper()
CancelledError:
在上面的例子中,task1被取消了后再用asyncio.gather收集結果,直接拋CancelledError錯誤了。這里有個細節,gather支持return_exceptions參數:
In : await asyncio.gather(task1, task2, return_exceptions=True)
Out: [concurrent.futures._base.CancelledError(), 'B']
可以看到,task2依然會執行完成,但是task1的返回值是一個CancelledError錯誤,也就是任務被取消了。如果一個創建后就不希望被任何情況取消,可以使用asyncio.shield保護任務能順利完成。不過要注意一個陷阱,先看錯誤的寫法:
In : task1 = asyncio.shield(a())
In : task2 = loop.create_task(b())
In : task1.cancel()
Out: True
In : await asyncio.gather(task1, task2, return_exceptions=True)
Suspending a
Suspending b
Resuming b
Out: [concurrent.futures._base.CancelledError(), 'B']
可以看到依然是CancelledError錯誤,且協程a未執行完成,正確的用法是這樣的:
In : task1 = asyncio.shield(a())
In : task2 = loop.create_task(b())
In : ts = asyncio.gather(task1, task2, return_exceptions=True)
In : task1.cancel()
Out: True
In : await ts
Suspending a
Suspending b
Resuming a
Resuming b
Out: [concurrent.futures._base.CancelledError(), 'B']
可以看到雖然結果是一個CancelledError錯誤,但是看輸出能確認協程實際上是執行了的。所以正確步驟是:先創建 GatheringFuture 對象 ts
取消任務
await ts
asynccontextmanager
如果你了解Python,之前可能聽過或者用過contextmanager ,一個上下文管理器。通過一個計時的例子就理解它的作用:
from contextlib import contextmanager
async def a():
await asyncio.sleep(3)
return 'A'
async def b():
await asyncio.sleep(1)
return 'B'
async def s1():
return await asyncio.gather(a(), b())
@contextmanager
def timed(func):
start = time.perf_counter()
yield asyncio.run(func())
print(f'Cost: {time.perf_counter() - start}')
timed函數用了contextmanager裝飾器,把協程的運行結果yield出來,執行結束后還計算了耗時:
In : from contextmanager import *
In : with timed(s1) as rv:
...: print(f'Result: {rv}')
...:
Result: ['A', 'B']
Cost: 3.0052654459999992
大家先體會一下。在Python 3.7添加了asynccontextmanager,也就是異步版本的contextmanager,適合異步函數的執行,上例可以這么改:
@asynccontextmanager
async def async_timed(func):
start = time.perf_counter()
yield await func()
print(f'Cost: {time.perf_counter() - start}')
async def main():
async with async_timed(s1) as rv:
print(f'Result: {rv}')
In : asyncio.run(main())
Result: ['A', 'B']
Cost: 3.00414147500004
async版本的with要用async with,另外要注意yield await func()這句,相當于yield + await func()
PS: contextmanager 和 asynccontextmanager 最好的理解方法是去看源碼注釋,可以看延伸閱讀鏈接2,另外延伸閱讀鏈接3包含的PR中相關的測試代碼部分也能幫助你理解
代碼目錄
本文代碼可以在 mp項目 找到
延伸閱讀
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的python asyncio理解_深入理解asyncio(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql统计数据的代码_MySQL按时
- 下一篇: 苹果xs max是双卡双待吗