asyncio协程与并发
并發編程
Python的并發實現有三種方法。
基本概念
串行:同時只能執行單個任務
并行:同時執行多個任務
在Python中,雖然嚴格說來多線程與協程都是串行的,但其效率高,在遇到阻塞時會將阻塞任務交給系統執行,通過合理調度任務,使得程序高效。
最高效的當然是多進程了,但由于多進程依賴硬件配置,并且當任務量超過CPU核心數時,多進程會有進程上下文切換開銷,而這個開銷很大,所以不是最佳解決方案。
常見耗時場景
CPU計算密集型
多線程對比單線程,由于GIL的存在,切換線程需要不斷加鎖、釋放鎖,效率反而更低;多進程相當于多個CPU同時工作,因此效率很高。
IO密集型
IO密集型可以是磁盤IO、網絡IO、數據庫IO等,都屬于計算量小,IO等待浪費高。越是IO等待時間長,則多線程的優勢相比單線程越明顯,多進程效率高但依賴配置資源。
結論
單線程總是最慢的,多線程適合在IO密集型場景使用,多進程適合CPU計算要求高的場景下使用,多進程雖然總是最快的,但需要CPU資源支持。
多線程
Python創建多線程有兩種方法。
用函數創建多線程
from threading import Threaddef func():for i in range(2):print('Hello world!')sleep(1)th1 = Thread(target=func) th1.start() th2 = Thread(target=func) th2.start()用類創建多線程
這個類必須繼承Thread,必須重載run()方法
from threading import Threadclass MyThread(Thread):def __init__(self):super().__init__()self.name = 'Bob'def run(self):for i in range(2):print('Hello world!')sleep(1)th1 = MyThread() th2 = MyThread()th1.start() th2.start()常用方法
- threading.Thread(target=func, args=())
- start() # 啟動子線程
- join() # 阻塞子線程
- is_alive()/isAlive() # 判斷線程執行狀態,正在執行返回True,否則False
- daemon # 設置線程是否隨主線程退出而退出,默認False
- name # 設置線程名
線程鎖
import threadinglock = threading.Lock() # 生成鎖,全局唯一lock.acquire() # 加鎖lock.release() # 釋放鎖加鎖與解鎖必須成對出現,或者使用上下文管理器with來管理鎖。
可重入鎖
在Redis分布式鎖中提到過,用于讓非阻塞線程重復獲得鎖來發送或讀取數據,這里的可重入鎖僅指讓同一線程可以多次獲取鎖。
import threadingrlock = threading.RLock() # 生成可重入鎖死鎖
死鎖通常有兩種。
GIL全局鎖
多進程是真正的并行,而多線程是偽并行,實際是多個線程交替執行。
遇到GIL影響性能的情況,要么考慮用多進程替代多線程,要么更換Python解釋器。
線程通信
常用線程通信方法。
Event事件
import threadingevent = threading.Event()event.clear() # 重置event,使所有該event事件都處于待命狀態event.wait() # 等待接收event指令,決定是否阻塞程序執行evnet.set() # 發送event指令,所有該event事件的線程開始執行 import timeimport threadingclass MyThread(threading.Thread):def __init__(self, name, event):super().__init__()self.name = nameself.event = eventdef run(self):self.event.wait() # 等待event.set()才能執行下去time.sleep(1)print('{} Done'.format(self.name))threads = [] event = threading.Event()for i in range(5):threads.append(MyThread(event))event.clear() # 重置event,使event.wait()生效for t in threads:t.start()print('Waiting 3s') time.sleep(3)print('Awake all threads') event.set() # 發送event指令,所有綁定了event的線程開始執行所有線程在調用start()方法后并不會執行完,而是在event.wait()處停住了,需要發送event.set()指令才能繼續執行。
Condition狀態
import threadingcond = threading.Condition()cond.acquire()cond.release()cond.wait() # 等待指令觸發,同時臨時釋放鎖,直到調用notify才重新占有鎖cond.notify() # 發送指令Condition與Event很類似,不過由于wait()與notify()可以反復調用,因此一般作為編程人員可控調用鎖來使用,放在run()方法下。
Queue隊列
隊列是線程安全的,通過put()和get()方法來操作隊列。
from queue import Queueq = Queue(maxsize=0) # 設置0表示無限長隊列q.get(timeout=0.5) # 阻塞程序,等待隊列消息,可以設置超時時間q.put() # 發送消息q.join() # 等待所有消息被消費完# 不常用但要了解的方法 q.qsize() # 返回消息個數 q.empty() # 返回bool值,隊列是否空 q.full() # 返回bool值,隊列是否滿Queue是FIFO隊列,還有queue.LifoQueue,queue.PriorityQueue。
線程隔離
兩個線程的變量不能被相互訪問。
通常使用threading.local類來實現,該類的實例是一個字典型對象,直接通過key-value形式存入變量,如threading.local().name = 'bob'。
如果想要實現一個線程內的全局變量或實現線程間的信息隔離,就使用local類。
線程池
多線程并不是越多越好,因為在切換線程時會切換上下文環境(當然相比多進程的開銷要小的多),在量大時依然會造成CPU的開銷。
因此出現了線程池的概念,即預先創建好合適數量的線程,使任務能立刻使用。
通過concurrent.futures庫的ThreadPoolExecutor類來實現。
import time import threading from concurrent.futures import ThreadPoolExecutordef target():for i in range(5):print('{}-{}\n'.format(threading.get_ident(), i)time.sleep(1)pool = ThreadPoolExecutor(5) # 線程池數量限制為5for i in range(100):pool.submit(target) # 往線程中提交并運行協程
學習協程,要先理解生成器,因為Python的協程是從生成器中誕生并演變到現在這個樣子的。
可迭代、迭代器、生成器
可迭代對象,其類或元類都實現了__iter__()方法,而該方法返回一個對象支持迭代,既可以是string/list/tuple/dict等內置類型的對象,也可以是自己寫的對象(這個對象的類實現了遍歷元素的__iter__方法)。
迭代器對象,可迭代對象是迭代器的基礎,迭代器只是比可迭代對象多了一個__next__()方法,這個方法讓我們可以不再用for循環來獲取元素。
生成器對象,在迭代器的基礎上,實現了yield,相當于函數中的return,在每次for循環遍歷或調用next()時,都會返回一個值并阻塞等待下一次調用。
可迭代對象、迭代器都是將所有元素放在內存里,而生成器則是需要時臨時生成元素,所以生成器節省時間、空間。
如何運行/激活生成器
兩個方法。
這兩個方法是等價的,但由于send方法可以傳值進去,所以在協程中大有用處。
生成器的執行狀態
通過inspect庫的getgeneratorstate方法獲取狀態信息。
生成器的異常
StopIteration
從生成器過渡到協程:yield
生成器引入了函數暫停執行(yield)功能,后來又引入了向暫停的生成器發送信息的功能(send),并以此催生了協程。
協程是為非搶占式多任務產生子程序的計算機程序組件,協程允許不同入口點在不同位置暫停或開始執行程序。
協程和線程有相似點,多個協程之間與線程一樣,只會交叉串行執行;也有不同點,線程之間要頻繁切換,加鎖、解鎖,協程不需要。
協程通過yield暫停生成器,將程序的執行流程交給其它子程序,從而實現不同子程序之間的交替執行。
通過例子演示如何向生成器發送信息。
def func(n):index = 0while index < n:num = yield index # 這里分成兩部分,yield index將index return給外部程序, num = yield接受外部send的信息并賦值給numif num is None:num = 1index += numf = func(5) print(next(f)) # 0 print(f.send(2)) # 2 print(next(f)) # 3 print(f.send(-1)) # 2yield from語法
從Python3.3才出現的語法。
yield from后面需要添加可迭代對象(迭代器、生成器當然滿足要求)。
# 拼接一個可迭代對象 # 使用yield astr = 'ABC'alist = [1, 2, 3]adict = dict(name='kct', age=18)agen = (i for i in range(5))def gen(*args):for item in args:for i in item:yield inew_list = gen(astr, alist, adict, agen)print("use yield:", list(new_list))# 使用yield fromdef gen(*args):for item in args:yield from itemnew_flist = fgen(astr, alist, adict, agen)print("use yield from:", list(new_flist))可以看出,使用yield from可以直接從可迭代對象中yield所有元素,減少了一個for循環,代碼更簡潔,當然yield from不止做了這件事。
yield from后可以接生成器,以此形成生成器嵌套,yield from就幫我們處理了各種異常,讓我們只需專心于業務代碼即可。
具體講解yield from前先了解幾個概念:
舉個例子,實時計算平均值
# 子生成器 def average_gen():total = 0count = 0average = 0while True:num = yield averagecount += 1total += numaverage = total/count# 委托生成器 def proxy_gen():while True:yield from average_gen()# 調用函數 def main():get_average = proxy_gen()next(get_average) # 第一次調用不傳值,讓子生成器開始運行print(get_average.send(10)) # 10print(get_average.send(20)) # 15print(get_average.send(30)) # 20委托生成器的作用是在調用函數與子生成器之間建立一個雙向通信通道,調用函數可以send消息給子生成器,子生成器yield值也是直接返回給調用函數。
有時會在yield from前作賦值操作,這是用于做結束操作,改造上面的例子。
# 子生成器 def average_gen():total = 0count = 0average = 0while True:num = yield averageif num is None:breakcount += 1total += numaverage = total/countreturn total, count, average # 當協程結束時,調用return# 委托生成器 def proxy_gen():while True:total, count, average = yield from average_gen() # 只有子生成器的協程結束了才會進行賦值,后面的語句才會執行print('Count for {} times, Total is {}, Average is {}'.format(count, total, average))# 調用函數 def main():get_average = proxy_gen()next(get_average) # 第一次調用不傳值,讓子生成器開始運行print(get_average.send(10)) # 10print(get_average.send(20)) # 15print(get_average.send(30)) # 20get_average.send(None) # 結束協程,如果后面再調用send,將會另起一協程為什么不直接調用子生成器?
yield from做了全面的異常處理。直接調用子生成器,首先就要處理StopIteration異常,其次若子生成器不是協程生成器而是迭代器,則會有其它異常拋出,因此需要知道,委托生成器在這之中扮演著重要角色,不可忽略。
asyncio
asyncio是Python3.4引入的標準庫,直接內置對異步IO的支持。
雖然學了yield和yield from,但還是不知如何入手去做并發,asyncio則是為了提供這么個框架來精簡復雜的代碼操作。
如何定義創建協程
通過前面學習,我們知道調用函數/委托生成器/子生成器這三劍客中,子生成器就是協程,那么asyncio如何來定義創建協程呢?
asyncio通過在函數定義前增加async關鍵字來定義協程對象,通過isinstance(obj, Coroutine)即可判斷是否是協程,這個協程類從collections.abc包導入。
我們也知道,生成器是協程的基礎,那么有什么辦法將生成器變成協程來使用?
通過@asyncio.coroutine裝飾器可以標記生成器函數為協程對象,但是通過isinstance(obj, Generator)、isinstance(obj, Coroutine)仍然可以看到,這個生成器函數只是被標記為協程了,但其本質依然是生成器。
重要概念
協程工作流程
await和yield
這兩者都能實現暫停的效果,但功能是不兼容的,在生成器中不能用await,在async定義的協程中不能用yield。
并且,yield from后可接可迭代對象、迭代器、生成器、future對象、協程對象,await后只能接future對象、協程對象。
創建future對象
前面我們知道通過async可以定義一個協程對象,那么如何創建一個future對象呢?
答案是通過task,只需要創建一個task對象即可。
# 在前一個例子中,我們先創建了事件循環,然后通過事件循環創建了task,我們來測試下 import asyncio from asyncio.futures import Futureasync def hello(name):print('Hello, ', name)coroutine = hello('World')# 創建事件循環 loop = asyncio.get_event_loop()# 將協程轉換為任務 task = loop.create_task(coroutine)print(isinstance(task, Future)) # 結果是True# 不建立事件循環的方法 task = asyncio.ensure_future(coroutine)print(isinstance(task, Future)) # 結果也是True知道了創建future對象(也即是創建task對象)的方法,那么我們驗證下await和yield后接coroutine和future對象。
import sys import asyncioasync def f1():await asyncio.sleep(2)return 'Hello, {}'.format(sys._getframe().f_code.co_name)@asyncio.coroutine def f2():yield from asyncio.sleep(2)return 'Hello, {}'.format(sys._getframe().f_code.co_name)async def f3():await asyncio.ensure_future(asyncio.sleep(2))return 'Hello, {}'.format(sys._getframe().f_code.co_name)@asyncio.coroutine def f4():yield from asyncio.ensure_future(asyncio.sleep(2))return 'Hello, {}'.format(sys._getframe().f_code.co_name)tasks = [asyncio.ensure_future(f1()),asyncio.ensure_future(f2()),asyncio.ensure_future(f3()),asyncio.ensure_future(f4()) ]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))for task in tasks:print(task.result())loop.close()綁定回調函數
異步IO都是在IO高的地方掛起,等IO操作結束后再繼續執行,大多數時候,我們后續的代碼執行都是需要依賴IO的返回值的,此時就要用到回調了。
回調的實現有兩種方式。
第一種,利用同步編程實現的回調
這種方法要求我們能夠取得協程的await的返回值。通過task對象的result()方法可以獲得返回結果。
import time import asyncioasync def _sleep(x):time.sleep(x)return 'Stopped {} seconds!'.format(x)coroutine = _sleep(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)loop.run_until_complete(task)# 直接通過task獲取任務結果 print('Result: {}'.format(task.result()))第二種,通過asyncio自帶的添加回調函數功能實現
import time import asyncioasync def _sleep(x):time.sleep(x)return 'Stopped {} seconds!'.format(x)def callback(future):print('Result: {}'.format(future.result()))coroutine = _sleep(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)# 添加回調函數 task.add_done_callback(callback)loop.run_until_complete(task)協程中的并發
asyncio實現并發,就需要多個協程來完成任務,前面做await和yield的驗證時就用了并發。
每當有任務阻塞的時候就await,然后其他協程繼續工作。
第一步,創建多個協程的列表
# 協程函數 async def worker(n):print('Waiting: {}'.format(n))await asyncio.sleep(n)return 'Done {}'.format(n)# 協程對象 c1 = worker(1) c2 = worker(2) c3 = worker(4)# 協程轉換為task tasks = [asyncio.ensure_future(c1),asyncio.ensure_future(c2),asyncio.ensure_future(c3)]loop = asyncio.get_event_loop()第二步,將列表注冊到事件循環中
有兩種方法,這兩種方法的區別后面說。
return的結果可以通過task.result()查看。
# asyncio.wait() loop.run_until_complete(asyncio.wait(tasks))# asyncio.gather() loop.run_until_complete(asyncio.gather(*tasks)) # *不能省略# 查看結果 for task in tasks:print('Result: {}'.format(task.result()))協程中的嵌套
使用async可以定義協程,協程用于耗時的IO操作,我們也可以封裝更多的IO操作過程,實現一個協程中await另一個協程,實現協程的嵌套。
# 內部協程函數 async def worker(n):print('Waiting: {}'.format(n))await asyncio.sleep(n)return 'Done {}'.format(n)# 外部協程函數 async def main():c1 = worker(1)c2 = worker(2)c3 = worker(4)tasks = [asyncio.ensure_future(c1),asyncio.ensure_future(c2),asyncio.ensure_future(c3)]dones, pendings = await asyncio.wait(tasks)for task in tasks:print('Result: {}'.format(task.result()))loop = asyncio.get_event_loop() loop.run_until_complete(main())如果外部協程使用的asyncio.gather(),那么作如下替換。
results = await asyncio.gather(*tasks)for result in results:print('Result: {}'.format(result))協程中的狀態
講生成器時提到了四種狀態,對協程我們也了解一下其狀態(準確地說是future/task對象的狀態)。
gather和wait
接收的參數不同
wait接收的tasks,必須是一個list對象,該list對象中存放多個task,既可以通過asyncio.ensure_future轉為task對象也可以不轉。
gather也可以接收list對象,但*不能省,也可以直接將多個task作為可變長參數傳入,參數可以是協程對象或future對象。
返回結果不同
wait返回dones和pendings,前者表示已完成的任務,后者表示未完成的任務,需要通過task.result()手工獲取結果。
gather直接將值返回。
協程控制功能
# FIRST_COMPLETED:完成第一個任務就返回 # FIRST_EXCEPTION:產生第一個異常就返回 # ALL_COMPLETED:所有任務完成再返回(默認選項) dones, pendings = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))# 控制運行時間:1秒后返回 dones, pendings = loop.run_until_complete(asyncio.wait(tasks, timeout=1))動態添加協程
在asyncio中如何動態添加協程到事件循環中?
兩種方法,一種是同步的,一種是異步的。
import time import asyncio from queue import Queue from threading import Thread# 在后臺永遠運行的事件循環 def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def do_sleep(x, queue, msg=""):time.sleep(x)queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,)) t.start()print(time.ctime())# 動態添加兩個協程 # 這種方法在主線程是同步的 new_loop.call_soon_threadsafe(do_sleep, 6, queue, 'First') new_loop.call_soon_threadsafe(do_sleep, 3, queue, 'Second')while True:msg = queue.get()print('{} is done'.format(msg))print(time.ctime()) import time import asyncio from queue import Queue from threading import Thread# 在后臺永遠運行的事件循環 def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_sleep(x, queue, msg=""):await asyncio.sleep(x)queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,)) t.start()print(time.ctime())# 動態添加兩個協程 # 這種方法在主線程是異步的 asyncio.run_coroutine_threadsafe(do_sleep(6, queue, 'First'), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3, queue, 'Second'), new_loop)while True:msg = queue.get()print('{} is done'.format(msg))print(time.ctime())轉載于:https://www.cnblogs.com/ikct2017/p/9534557.html
總結
以上是生活随笔為你收集整理的asyncio协程与并发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 0821-0823
- 下一篇: 关于js渲染网页时爬取数据的思路和全过程