python 使用 asyncio 包处理并发
生活随笔
收集整理的這篇文章主要介紹了
python 使用 asyncio 包处理并发
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 1. 線(xiàn)程與協(xié)程對(duì)比
- 2. 使用 asyncio 和 aiohttp 下載
- 3. 避免阻塞型調(diào)用
- 4. 使用 asyncio.as_completed
- 5. 使用Executor對(duì)象,防止阻塞事件循環(huán)
- 6. 從回調(diào)到期物和協(xié)程
learn from 《流暢的python》
1. 線(xiàn)程與協(xié)程對(duì)比
threading
import threading import itertools import time import sysclass Signal:go = Truedef spin(msg, signal):write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 無(wú)限循環(huán)status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格鍵,光標(biāo)移動(dòng)回去time.sleep(0.1)if not signal.go:breakwrite(' ' * len(status) + "\x08" * len(status))# 使用空格清除狀態(tài)消息,把光標(biāo)移回開(kāi)頭def slow_function(): # 假設(shè)是一個(gè)耗時(shí)的計(jì)算過(guò)程time.sleep(10) # sleep 會(huì)阻塞主線(xiàn)程,釋放GIL,創(chuàng)建從屬線(xiàn)程return 42def supervisor(): # 該函數(shù),設(shè)置從屬線(xiàn)程,顯示線(xiàn)程對(duì)象,運(yùn)行耗時(shí)的計(jì)算,最后殺死線(xiàn)程signal = Signal()spinner = threading.Thread(target=spin, args=("thinking!", signal))print("spinner object:", spinner) # 顯示從屬線(xiàn)程對(duì)象spinner.start() # 啟動(dòng)從屬線(xiàn)程result = slow_function() # 運(yùn)行計(jì)算程序,阻塞主線(xiàn)程,從屬線(xiàn)程動(dòng)畫(huà)顯示旋轉(zhuǎn)指針signal.go = False # 改變signal 狀態(tài),終止 spin 中的for循環(huán)spinner.join() # 等待spinner線(xiàn)程結(jié)束return resultdef main():result = supervisor() # 運(yùn)行 supervisorprint("Answer:", result)if __name__ == '__main__':main()適合 asyncio 的協(xié)程要由調(diào)用方驅(qū)動(dòng),并由調(diào)用方通過(guò) yield from 調(diào)用(語(yǔ)法過(guò)時(shí)了,新版的用 async / await )
或者把協(xié)程傳給 asyncio 包中的某個(gè)函數(shù)
一篇博文參考:https://www.cnblogs.com/dhcn/p/9032461.html
import asyncio import itertools import sys# https://docs.python.org/3.8/library/asyncio.html async def spin(msg): # py3.5以后的新語(yǔ)法 async / await,協(xié)程函數(shù)write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 無(wú)限循環(huán)status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格鍵,光標(biāo)移動(dòng)回去try:await asyncio.sleep(0.1)except asyncio.CancelledError: # 遇到取消異常,退出循環(huán)print("cancel")breakwrite(' ' * len(status) + "\x08" * len(status))print("end spin")async def slow_function(): # 協(xié)程函數(shù)print("start IO")await asyncio.sleep(3) # 假裝進(jìn)行 IO 操作print("end IO ")return 42async def supervisor(): # 協(xié)程函數(shù)spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任務(wù)print("spinner object:", spinner) # 顯示從屬線(xiàn)程對(duì)象# spinner object: <Task pending coro=<spin() running at D:\ >print("start slow")result = await slow_function()print("end slow")spinner.cancel() # task對(duì)象可以取消,拋出CancelledError異常return resultdef main():loop = asyncio.get_event_loop() # 獲取事件循環(huán)的引用result = loop.run_until_complete(supervisor()) # 驅(qū)動(dòng) supervisor 協(xié)程,讓它運(yùn)行完畢loop.close()print("answer:", result)if __name__ == '__main__':main()輸出:
spinner object: <Task pending coro=<spin() running at D:\gitcode > start slow start IO end IO ng!(期間thinking!在輸出,后來(lái)被覆蓋) end slow cancel end spin answer: 42請(qǐng)按任意鍵繼續(xù). . .2. 使用 asyncio 和 aiohttp 下載
import time import sys import os import asyncio import aiohttpPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = './'def save_flag(img, filename): # 保存圖像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 獲取圖像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:image = await resp.read()return imageasync def download_one(cc):image = await get_flag(cc)show(cc)save_flag(image, cc.lower() + '.gif')return ccdef download_many_(cc_list):loop = asyncio.get_event_loop()todo = [download_one(cc) for cc in sorted(cc_list)] # 協(xié)程對(duì)象wait_coro = asyncio.wait(todo) # 包裝成 task,wait是協(xié)程函數(shù),返回協(xié)程或者生成器對(duì)象res, _ = loop.run_until_complete(wait_coro)# 驅(qū)動(dòng)協(xié)程,返回 第一個(gè)元素是一系列結(jié)束的期物,第二個(gè)元素是一系列未結(jié)束的期物# loop.close(),好像不需要這句 上面 with 處可能自動(dòng)關(guān)閉了return len(res)def main(download_many):t0 = time.time()count = download_many(POP20_CC)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 計(jì)時(shí)信息if __name__ == '__main__':main(download_many_)# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN # 20 flags downloaded in 3.88s3. 避免阻塞型調(diào)用
執(zhí)行硬盤(pán)或網(wǎng)絡(luò) I/O 操作的函數(shù)定義為 阻塞型函數(shù)
有兩種方法能 避免阻塞型調(diào)用 中止整個(gè)應(yīng)用程序 的進(jìn)程:
- 在單獨(dú)的線(xiàn)程中運(yùn)行各個(gè)阻塞型操作
- 把每個(gè)阻塞型操作 轉(zhuǎn)換成非阻塞的異步調(diào)用 使用
4. 使用 asyncio.as_completed
import collections import time import sys import os import asyncio from http import HTTPStatusimport aiohttp from aiohttp import web import tqdmPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = './' DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000class FetchError(Exception):def __init__(self, country_code):self.country_code = country_codedef save_flag(img, filename): # 保存圖像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 獲取圖像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:if resp.status == 200:image = await resp.read()return imageelif resp.status == 404:raise web.HTTPNotFound()else:raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:save_flag(image, cc.lower() + '.gif')status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)async def downloader_coro(cc_list, verbose, concur_req): # 協(xié)程函數(shù)counter = collections.Counter()semaphore = asyncio.Semaphore(value=concur_req) # 最多可以使用這個(gè)計(jì)數(shù)器的協(xié)程個(gè)數(shù)todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)] # 協(xié)程對(duì)象列表todo_iter = asyncio.as_completed(todo) # 獲取迭代器,會(huì)在期物運(yùn)行結(jié)束后返回期物if not verbose:todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list)) # 迭代器傳給tqdm,顯示進(jìn)度條for future in todo_iter: # 迭代器運(yùn)行結(jié)束的期物try:res = await future # 獲取期物對(duì)象的結(jié)果except FetchError as exc:country_code = exc.country_codetry:error_msg = exc.__cause__.args[0]except IndexError:error_msg = exc.__cause__.__class__.__name__if verbose and error_msg:msg = '*** Error for {}: {}'print(msg.format(country_code, error_msg))status = HTTPStatus.errorelse:status = res[0]counter[status] += 1 # 記錄結(jié)果return counter # 返回計(jì)數(shù)器def download_many_(cc_list, verbose, concur_req):loop = asyncio.get_event_loop()coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)# 實(shí)例化 downloader_coro協(xié)程,然后通過(guò) run_until_complete 方法把它傳給事件循環(huán)counts = loop.run_until_complete(coro)# loop.close() # 好像不需要這句 上面 with 處可能自動(dòng)關(guān)閉了return countsdef main(download_many):t0 = time.time()count = download_many(POP20_CC, True, MAX_CONCUR_REQ)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 計(jì)時(shí)信息if __name__ == '__main__':main(download_many_)5. 使用Executor對(duì)象,防止阻塞事件循環(huán)
- loop.run_in_executor 方法把阻塞的作業(yè)(例如保存文件)委托給線(xiàn)程池做
6. 從回調(diào)到期物和協(xié)程
- 如果一個(gè)操作需要依賴(lài)之前操作的結(jié)果,那就得嵌套回調(diào)
好的寫(xiě)法:
async def three_stages(request1): response1 = await api_call1(request1) # 第一步 request2 = step1(response1) response2 = await api_call2(request2) # 第二步 request3 = step2(response2) response3 = await api_call3(request3)# 第三步 step3(response3) loop.create_task(three_stages(request1)) # 必須顯式調(diào)度執(zhí)行協(xié)程 必須使用 事件循環(huán) 顯式排定 協(xié)程的執(zhí)行時(shí)間
異步系統(tǒng) 能 避免用戶(hù)級(jí)線(xiàn)程的開(kāi)銷(xiāo),這是它能比多線(xiàn)程系統(tǒng)管理更多并發(fā)連接的主要原因
總結(jié)
以上是生活随笔為你收集整理的python 使用 asyncio 包处理并发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 数据仓库 Hive(内含大数据镜像下载)
- 下一篇: 利用bds和dfs解决 LeetCode