python 异步编程:协程与 asyncio
文章目錄
- 一、協程(coroutine)
- 1.1 協程的概念
- 1.2 實現協程的方式
- 二、asyncio 異步編程
- 2.1 事件循環
- 2.2 快速上手
- 2.3 運行協程
- 2.4 await 關鍵字
- 2.5 可等待對象
- 2.5.1 協程
- 2.5.2 任務(Task)
- 2.5.3 asyncio.Future
- 三、concurrent.futures.Future(補充)
- 3.1 爬蟲案例(asyncio+不支持異步的模塊)
- 四、asyncio 異步迭代器
- 五、asyncio 異步上下文管理
- 六、Uvloop
- 七、實戰案例
- 7.1 異步Redis
- 7.2 異步MySQL
- 7.3 FastAPI框架
一、協程(coroutine)
1.1 協程的概念
**協程也叫“輕量級線程”,是一種用戶態中來回切換代碼塊執行的技術,目的是減少阻塞,提高程序的運行速度。**協程不是計算機提供的功能,而是需要程序員自己實現的,是單個線程內的任務調度技巧。
假設,現在要執行2個函數(不用管誰先完成),這2個函數在運行中各需要3秒阻塞時間去等待網絡 IO 的完成:
- 不使用協程:一個阻塞3秒并執行完成后,另一個才去執行,同樣阻塞3秒,總共需要6秒的時間,才能完成兩個函數的執行;
- 使用協程后:先執行的函數,遇到阻塞后,解釋器會立馬保存阻塞函數的現場數據,并調用另一個函數執行,這樣,就相當于同時執行兩個函數,總共只需要3秒的時間。大大節省了運行時間,提高了程序的運行效率。
1.2 實現協程的方式
- greenlet、gevent 等第三方模塊;
- yield 關鍵字;
- asyncio 裝飾器(python 3.4版本);
- async、await 關鍵字(python 3.5及以上版本)【推薦】。
二、asyncio 異步編程
2.1 事件循環
事件循環可以理解為一個不斷檢測并執行代碼的死循環,是 python 協程系統的核心。它維護著一個任務隊列,在整個程序運行過程中,不斷循環執行隊列中的任務,一旦發生阻塞就切換任務。
import asyncio # python 自帶# 獲取一個事件循環 loop = asyncio.get_event_loop() # 將任務放到 loop.run_until_complete(任務)2.2 快速上手
import asyncioasync def main(): # 定義協程函數print('hello') await asyncio.sleep(1)print('world') asyncio.run(main()) # 運行協程函數 """ 輸出: hello world """注意!執行協程函數只會得到協程對象,不會立刻執行函數內的代碼。
main() <coroutine object main at 0x1053bb7c8>2.3 運行協程
要真正運行一個協程,asyncio 提供了三種主要機制:
-
第一種:用asyncio.run()函數用來運行最高層級的入口點main()函數。 (參見上面的示例)
-
第二種:使用await關鍵字”等待“一個協程對象(await后面會詳解)。以下代碼段會在等待 1 秒后打印 “hello”,然后再次等待 2 秒后打印 “world”:
import asyncio import timeasync def say_after(delay, what):await asyncio.sleep(delay) # 這樣才會真正執行 sleep 協程函數print(what)async def main():print(f"開始:{time.strftime('%X')}")await say_after(1, 'hello')await say_after(2, 'world')print(f"結束:{time.strftime('%X')}")asyncio.run(main())預期的輸出:
開始:17:13:52 hello world 結束:17:13:55 -
第三種:asyncio.create_task()函數用來并發運行作為 asyncio 任務的多個協程。
讓我們修改以上示例,并發運行兩個 say_after 協程:
async def main():task1 = asyncio.create_task(say_after(1, 'hello'))task2 = asyncio.create_task(say_after(2, 'world'))print(f"開始:{time.strftime('%X')}")# 等帶兩個任務都完成,大約需要2秒await task1await task2print(f"結束:{time.strftime('%X')}")注意,預期的輸出顯示代碼段的運行時間比之前快了 1 秒:
started at 17:14:32 hello world finished at 17:14:34
2.4 await 關鍵字
await 可等待對象:表示遇到阻塞后,先掛起當前協程(任務),讓事件循環去執行其他任務(如果有其他任務的話),等待“可等待對象”執行完成后,再繼續執行下面的代碼。
import asyncioasync def main():print('hello')# 會掛起 main 一秒鐘,然后打印 world# 一般用于后續步驟需要可等待對象完成后才能執行的情況await asyncio.sleep(1)print('world')如果可等待對象有返回值,可以直接保存:result = await 可等待對象。
2.5 可等待對象
可等待對象是指可以在await語句中使用的對象,它主要有三種類型:協程、任務和 Future。
2.5.1 協程
在本文中,“協程”可用來表示兩個緊密關聯的概念:
-
協程函數:使用async def 函數名定義的函數。
-
協程對象:調用協程函數得到的對象。
asyncio 也支持舊式的基于生成器(yield 關鍵字)的協程對象。
2.5.2 任務(Task)
當一個協程被asyncio.create_task()等函數封裝成一個任務,該協程就會被自動調度執行:
import asyncioasync def nested():return 42async def main():# 創建任務,并將 nested 函數添加到事件循環 task1 = asyncio.create_task(nested())task2 = asyncio.create_task(nested())# 可以給任務起一個名稱# task = asyncio.create_task(nested(), name="t1")# 等待 task 結束await task1await task2asyncio.run(main())上面的方法不常用,更加常用的方法是:
import asyncioasync def nested():return 42async def main():# 創建任務,并將 nested 函數添加到事件循環 task_list = [asyncio.create_task(nested(),name="t1"),asyncio.create_task(nested(),name="t2")]# 等待 task 結束done, pending = await asyncio.wait(task_list, timeout=3) # 超時時間是可選的 asyncio.run(main())說明:
- done:所有任務完成后的返回結果的集合。
- pending:不常用,任務超時后返回的結果集合。
2.5.3 asyncio.Future
Future 是一個比較底層的可等待對象,任務(Task)是基于 Future 的。Future 一般不會直接用,它表示一個異步操作的最終結果。當一個 Future 對象被等待,這意味著協程將保持等待直到該 Future 對象在其他地方操作完畢。
async def main():await function_that_returns_a_future_object()# 下面的寫法也是有效的await asyncio.gather(function_that_returns_a_future_object(),some_python_coroutine())三、concurrent.futures.Future(補充)
該 Future 對象用于線程池、進程池實現異步操作時用,與 asyncio.Future 沒有任何關系,僅僅是名稱相同而已。
import time from concurrent.futures import Future from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.process import ProcessPoolExecutordef func(val):time.sleep(1)print(val)return "abc"# 創建線程池 pool = ThreadPoolExecutor(max_workers=5)# 創建進程池 # pool = ProcessPoolExecutor(max_workers=5)for i in range(10):fut = pool.submit(func, i) # fut 就是 concurrent.futures.Future 對象print(fut)在實際開發中,可能會出現多進程、多線程和協程交叉實現的情況。比如:基于協程的異步編程 + MySQL(不支持異步)。但我們可以這么做:
import time import asyncio import concurrent.futuresdef func1():"""某個耗時操作"""time.sleep(2)return "abc"async def main():# 獲取事件循環loop = async.get_running_loop()# 1. 在默認的循環執行器中運行result = await loop.run_in_executor(None, func1) # 第一個print('default thread pool', result)# 2. 在自定義線程池中運行with concurrent.futures.ThreadPoolExecutor() as pool:result = await loop.run_in_executor(pool, func1)print('custom thread pool', result)# 3. 在自定義進程池中運行with concurrent.futures.ProcessPoolExecutor() as pool:result = await loop.run_in_executor(pool, func1)print('custom process pool', result)asyncio.run(main())說明:
run_in_executor的參數:
- 第一個參數是concurrent.futures.Executor實例,如果為None,則使用默認的執行器。
- 第二個參數就是要執行的函數。
run_in_executor內部做了兩件事情:
3.1 爬蟲案例(asyncio+不支持異步的模塊)
import asyncio import requests async def download_image(url):# 發送網絡請求,下載圖片(遇到網絡下載圖片的IO請求,自動化切換到其他任務)print("開始下載:", url)loop = asyncio.get_event_loop()# requests模塊默認不支持異步操作,所以就使用線程池來配合實現了。future = loop.run_in_executor(None, requests.get, url)response = await futureprint('下載完成')# 圖片保存到本地文件file_name = url.rsplit('_')[-1]with open(file_name, mode='wb') as file_object:file_object.write(response.content) if __name__ == '__main__':url_list = ['https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg','https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg','https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg']tasks = [download_image(url) for url in url_list]loop = asyncio.get_event_loop()loop.run_until_complete( asyncio.wait(tasks) )四、asyncio 異步迭代器
-
異步迭代器:
異步迭代器與普通的迭代器是基本一致的,只不過內部實現的是__aiter__()和__anext__()方法。__anext__()必須返回一個awaitable對象。async for會處理異步迭代器的__anext__()方法所返回的可等待對象,知道引發一個StopAsyncIteration異常。
-
異步可迭代對象:
可以在async for語句中使用的對象,必須通過它的__aiter__()方法返回一個異步迭代器。
舉例:
import asyncioclass Reader(object):""" 自定義異步迭代器(同時也是異步可迭代對象) """def __init__(self):self.count = 0async def readline(self):# await asyncio.sleep(1)self.count += 1if self.count == 100:return Nonereturn self.countdef __aiter__(self):return selfasync def __anext__(self):val = await self.readline()if val == None:raise StopAsyncIterationreturn valasync def func():# 創建異步可迭代對象async_iter = Reader()# async for 必須要放在async def函數內,否則語法錯誤。async for item in async_iter:print(item)asyncio.run(func())異步迭代器其實沒什么太大的作用,只是支持了async for語法而已。
五、asyncio 異步上下文管理
異步上下文管理需要實現的是__aenter__()和__aexit__()方法,以此實現對async with語句中的環境進行控制。
import asyncioclass AsyncContextManager:def __init__(self):self.conn = Noneasync def do_something(self):# 異步操作數據庫return 666async def __aenter__(self):# 異步鏈接數據庫self.conn = await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):# 異步關閉數據庫鏈接await asyncio.sleep(1)async def func():# 與異步迭代器一樣,必須放在協程函數內async with AsyncContextManager() as f:result = await f.do_something()print(result)asyncio.run(func())六、Uvloop
Python標準庫中提供了asyncio模塊,用于支持基于協程的異步編程。
uvloop 是 asyncio 中的事件循環的替代方案,替換后可以使得asyncio性能提高。事實上,uvloop要比nodejs、gevent等其他python異步框架至少要快2倍,性能可以比肩Go語言。
安裝uvloop:
pip install uvloop在項目中想要使用uvloop替換asyncio的事件循環也非常簡單,只要在代碼中這么做就行。
import asyncioimportuvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())# 編寫asyncio的代碼,與之前寫的代碼一致。# 內部的事件循環自動化會變為 uvloopasyncio.run(...)注意:知名的 asgi uvicorn 內部就是使用的uvloop的事件循環。
七、實戰案例
7.1 異步Redis
安裝 aioredis 模塊:
pip3 install aioredis示例1:異步操作redis,在遇到 IO 等待的地方,使用 await 關鍵字。
import asyncio import aioredisasync def execute(address, password):print("開始執行", address)# 網絡IO操作:創建redis連接redis = await aioredis.create_redis(address, password=password)# 網絡IO操作:在redis中設置哈希值car,內部在設三個鍵值對,即: redis = { car:{key1:1,key2:2,key3:3}}await redis.hmset_dict('car', key1=1, key2=2, key3=3) # 網絡IO操作:去redis中獲取值result = await redis.hgetall('car', encoding='utf-8')print(result)redis.close()# 網絡IO操作:關閉redis連接await redis.wait_closed()print("結束", address)asyncio.run(execute('redis://47.93.4.198:6379', "root12345"))示例2:連接多個redis做操作(遇到IO會切換其他任務,提供了性能)。
import asyncio import aioredisasync def execute(address, password):print("開始執行", address)# 網絡IO操作:先去連接 47.93.4.197:6379,遇到IO則自動切換任務,去連接47.93.4.198:6379redis = await aioredis.create_redis_pool(address, password=password)# 網絡IO操作:遇到IO會自動切換任務await redis.hmset_dict('car', key1=1, key2=2, key3=3)# 網絡IO操作:遇到IO會自動切換任務result = await redis.hgetall('car', encoding='utf-8')print(result)redis.close()# 網絡IO操作:遇到IO會自動切換任務await redis.wait_closed()print("結束", address)task_list = [execute('redis://47.93.4.197:6379', "root12345"),execute('redis://47.93.4.198:6379', "root12345")]asyncio.run(asyncio.wait(task_list))更多redis操作參考 aioredis 官網:傳送門
7.2 異步MySQL
當通過python去操作MySQL時,連接、執行SQL、關閉都涉及網絡IO請求,使用asycio異步的方式可以在IO等待時去做一些其他任務,從而提升性能。
安裝Python異步操作redis模塊
pip3 install aiomysql例子1:
import asyncio import aiomysql async def execute():# 網絡IO操作:連接MySQL conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql')# 網絡IO操作:創建CURSOR cur = await conn.cursor()# 網絡IO操作:執行SQLawait cur.execute("SELECT Host,User FROM user")# 網絡IO操作:獲取SQL結果result = await cur.fetchall()print(result)# 網絡IO操作:關閉鏈接await cur.close()conn.close()asyncio.run(execute())例子2:
import asyncio import aiomysql async def execute(host, password):print("開始", host)# 網絡IO操作:先去連接 47.93.40.197,遇到IO則自動切換任務,去連接47.93.40.198:6379conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')# 網絡IO操作:遇到IO會自動切換任務cur = await conn.cursor()# 網絡IO操作:遇到IO會自動切換任務await cur.execute("SELECT Host,User FROM user")# 網絡IO操作:遇到IO會自動切換任務result = await cur.fetchall()print(result)# 網絡IO操作:遇到IO會自動切換任務await cur.close()conn.close()print("結束", host)task_list = [execute('47.93.40.197', "root!2345"),execute('47.93.40.197', "root!2345")]asyncio.run(asyncio.wait(task_list))7.3 FastAPI框架
FastAPI 是一款用于構建 API 的高性能 web 框架,框架基于 Python3.6+的 type hints搭建。
接下來的異步示例以FastAPI和uvicorn來講解(uvicorn是一個支持異步的asgi)。
安裝 FastAPI:
pip3 install fastapi安裝 uvicorn:
pip3 install uvicorn舉例:
import asyncio import uvicorn import aioredis from aioredis import Redis from fastapi import FastAPIapp = FastAPI()REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123",minsize=1, maxsize=10)@app.get("/") def index():""" 普通操作接口"""return {"message": "Hello World"}@app.get("/red") async def red():""" 異步操作接口"""print("請求來了")await asyncio.sleep(3)# 連接池獲取一個連接conn = await REDIS_POOL.acquire()redis = Redis(conn)# 設置值await redis.hmset_dict('car', key1=1, key2=2, key3=3)# 讀取值result = await redis.hgetall('car', encoding='utf-8')print(result)# 連接歸還連接池REDIS_POOL.release(conn)return resultif __name__ == '__main__':uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")在有多個用戶并發請求的情況下,異步方式來編寫的接口可以在 IO 等待過程中去處理其他的請求,提供性能。這就是 FastAPI 如此高性能的原因所在。
總結
以上是生活随笔為你收集整理的python 异步编程:协程与 asyncio的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 光学指纹场景使用OLED HBM功能
- 下一篇: lr增强细节_LR和PS如何进行风光后期