Python并发之协程gevent基础
基本示例
from gevent import monkey monkey.patch_all() # 記住一定放在第一行,這里是打補丁的意思,time模塊在使用協程gevent模塊的時候,必須打補丁才行,記得放在第一行。 import gevent import timedef eat(name):print(f"{name} eat first")time.sleep(3)print(f"{name} eat second")def play(name):print(f"{name} play phone 1")time.sleep(2)print(f"{name} play phone 2")g1 = gevent.spawn(eat, "lily") g2 = gevent.spawn(play, name="lily") g1.join() g2.join()?
1,gevent介紹
gevent是第三方庫,通過?greenlet?實現?coroutine,創建、調度的開銷比?線程(thread)?還小,因此程序內部的?執行流?效率高。
gevent 實現了 python 標準庫中一些阻塞庫的非阻塞版本,如 socket、os、select 等 (全部的可參考?gevent1.0 的 monkey.py 源碼),可用這些非阻塞的庫替代 python 標準庫中的阻塞的庫。
gevent 提供的 API 與 python 標準庫中的用法和名稱類似。
其基本思想是:當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由于IO操作非常耗時,經常使程序處于等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
gevent是基于協程的Python網絡庫。特點:
- 基于libev的快速事件循環(Linux上epoll,FreeBSD上kqueue)。
- 基于greenlet的輕量級執行單元。
- API的概念和Python標準庫一致(如事件,隊列)。
- 可以配合socket,ssl模塊使用。
- 能夠使用標準庫和第三方模塊創建標準的阻塞套接字(gevent.monkey)。
- 默認通過線程池進行DNS查詢,也可通過c-are(通過GEVENT_RESOLVER=ares環境變量開啟)。
- TCP/UDP/HTTP服務器
- 子進程支持(通過gevent.subprocess)
- 線程池
gevent常用方法:
gevent.spawn()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 創建一個普通的Greenlet對象并切換
gevent.spawn_later(seconds=3)?? ?延時創建一個普通的Greenlet對象并切換
gevent.spawn_raw()? ? ? ? ? ? ? ? ? ? ? ?創建的協程對象屬于一個組
gevent.getcurrent()? ? ? ? ? ? ? ? ? ? ? ? ?返回當前正在執行的greenlet
gevent.joinall(jobs)? ? ? ? ? ? ? ? ? ? ? ? ? 將協程任務添加到事件循環,接收一個任務列表
gevent.wait()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 可以替代join函數等待循環結束,也可以傳入協程對象列表
gevent.kill()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 殺死一個協程
gevent.killall()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?殺死一個協程列表里的所有協程
monkey.patch_all()? ? ? ? ? ? ? ? ? ? ? ? ? ? 非常重要,會自動將python的一些標準模塊替換成gevent框架
greenlet常用實例方法:
# Greenlet對象 from gevent import Greenlet# Greenlet對象創建 job = Greenlet(target0, 3) Greenlet.spawn() # 創建一個協程并啟動 Greenlet.spawn_later(seconds=3) # 延時啟動# 協程啟動 job.start() # 將協程加入循環并啟動協程 job.start_later(3) # 延時啟動# 等待任務完成 job.join() # 等待任務完成 job.get() # 獲取協程返回的值# 任務中斷和判斷任務狀態 job.dead() # 判斷協程是否死亡 job.kill() # 殺死正在運行的協程并喚醒其他的協程,這個協程將不會再執行,可以 job.ready() # 任務完成返回一個真值 job.successful() # 任務成功完成返回真值,否則拋出錯誤# 獲取屬性 job.loop # 時間循環對象 job.value # 獲取返回的值# 捕捉異常 job.exception # 如果運行有錯誤,獲取它 job.exc_info # 錯誤的詳細信息# 設置回調函數 job.rawlink(back) # 普通回調,將job對象作為回調函數的參數 job.unlink() # 刪除回調函數 # 執行成功的回調函數 job.link_value(back) # 執行失敗的回調函數 job.link_exception(back)?gevent.Pool的特殊方法:
pool.wait_available():等待直到有一個協程有結果 pool.dd(greenlet):向進程池添加一個方法并跟蹤,非阻塞 pool.discard(greenlet):停止跟蹤某個協程 pool.start(greenlet):加入并啟動協程 pool.join():阻塞等待結束 pool.kill():殺死所有跟蹤的協程 pool.killone(greenlet):殺死一個協程2,什么時候用/不用gevent
gevent 的優勢:
- 可以通過同步的邏輯實現并發操作,大大降低了編寫并行/并發程序的難度
- 在一個進程中使用 gevent 可以有效避免對?臨界資源?的互斥訪問
如果程序涉及較多的 I/O,可用 gevent 替代多線程來提高程序效率。但由于
- gevent 中 coroutine 的調度是由使用者而非操作系統決定
- 主要解決的是 I/O 問題,提高?IO-bound?類型的程序的效率
- 由于是在一個進程中實現 coroutine,且操作系統以進程為單位分配處理機資源 (一個進程分配一個處理機)
?
因此,gevent 不適合在以下場景中使用:
- 對任務延遲有要求的場景,如交互式程序中 (此時需要操作系統進行?公平調度)
- CPU-bound?任務
- 當需要使用多處理機時 (可通過運行多個進程,每個進程內實現 coroutine 來解決這個問題)
3,gevent操作
如何生成 greenlet instance
一般有兩種方法:
- 使用?gevent.spawn()?API
- subclass?Greenlet
第一種方法是調用了?Greenlet?class 中的?spawn?類方法,且生成 greenlet instance 后將其放入 coroutine 的調度隊列中。第二種方法需要手動通過?instance.start()?方法手動將其加入到 coroutine 的調度隊列中。
代碼示例:
需要注意:
- 若僅是想生成 greenlet instance 并置于調度隊列中,最好采用?gevent.spawn()?API
- 若想僅生成 greenlet instance 且暫時不想加入到調度隊列,則可采用第二種方法。之后若想將其加入到調度隊列,則手動執行?instance.start()?方法。
?
如何進行主線程到 hub greenlet instance 的切換
- gevent.sleep()
- Greenlet 或 Greenlet 子類的 instance 的?join()?方法
- monkey patch 的庫或方法 (參見?monkey.py):
- ? ? socket
- ? ? ssl
- ? ? os.fork
- ? ? time.sleep
- ? ? select.select
- ? ? thread
- ? ? subprocess
- ? ? sys.stdin,sys.stdout,sys.stderr
4,gevent核心功能
- Greenlets
- 同步和異步執行
- 確定性
- 創建Greenlets
- Greenlet狀態
- 程序停止
- 超時
- 猴子補丁
?
4.1,Greenlets
在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
????一個 “greenlet” 是一個小型的獨立偽線程。可以把它想像成一些棧幀,棧底是初始調用的函數,而棧頂是當前greenlet的暫停位置。你使用greenlet創建一堆這樣的堆棧,然后在他們之間跳轉執行。跳轉必須顯式聲明的:一個greenlet必須選擇要跳轉到的另一個greenlet,這會讓前一個掛起,而后一個在此前掛起處恢復執行。不同greenlets之間的跳轉稱為切換(switching) 。
??????greenlet不是一種真正的并發機制,而是在同一線程內,在不同函數的執行代碼塊之間切換,實施“你運行一會、我運行一會”,并且在進行切換時必須指定何時切換以及切換到哪。
greenlet類主要有兩個方法:
- switch:用來切換協程;
- throw():用來拋出異常同時終止程序;
4.2,同步和異步執行
并發的核心思想在于,大的任務可以分解成一系列的子任務,后者可以被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。在gevent里面,上下文切換是通過yielding來完成的.
當我們在受限于網絡或IO的函數中使用gevent,這些函數會被協作式的調度, gevent的真正能力會得到發揮。Gevent處理了所有的細節, 來保證你的網絡庫會在可能的時候,隱式交出greenlet上下文的執行權。
示例如下:
例子中的select()函數通常是一個在各種文件描述符上輪詢的阻塞調用。
import time import gevent start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1():print('Started Polling: %s' % tic())select.select([], [], [], 1)print('Ended Polling: %s' % tic()) def gr2():print('Started Polling: %s' % tic())select.select([], [], [], 2)print('Ended Polling: %s' % tic()) def gr3():print("Hey lets do some stuff while the greenlets poll, %s" % tic())gevent.sleep(3)print('Ended Polling: %s' % tic()) gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3), ])輸出:
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 1.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 3.0 seconds同步vs異步
下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執行這個函數的副作用就是,每次task在它的執行過程中都會隨機地停某些秒。
import gevent import randomdef task(pid):gevent.sleep(random.randint(0,2)*0.001)print('task {} done'.format(pid))def synchronous():for i in range(5):task(i)def asynchronous():gev_list = [gevent.spawn(task, i) for i in range(5)]gevent.joinall(gev_list)print("synchronous:") synchronous()print("asynchronous:") asynchronous()運行結果:
synchronous: task 0 done task 1 done task 2 done task 3 done task 4 done asynchronous: task 4 done task 3 done task 0 done task 1 done task 2 done上例中,在同步的部分,所有的task都同步的執行, 結果當每個task在執行時主流程被阻塞(主流程的執行暫時停住)。
程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall?函數,后者阻塞當前流程,并執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。
要重點留意的是,異步的部分本質上是隨機的,而且異步部分的整體運行時間比同步 要大大減少。事實上,同步部分的最大運行時間,即是每個task停0.002秒,結果整個 隊列要停0.02秒。而異步部分的最大運行時間大致為0.002秒,因為沒有任何一個task會 阻塞其它task的執行。
?
4.3,確定性
greenlet具有確定性。在相同配置相同輸入的情況下,它們總是會產生相同的輸出。
下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執行這個函數的副作用就是,每次task在它的執行過程中都會隨機地停某些秒。
?
執行結果
False True即使gevent通常帶有確定性,當開始與如socket或文件等外部服務交互時, 不確定性也可能溜進你的程序中。因此盡管gevent線程是一種“確定的并發”形式, 使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。
涉及并發長期存在的問題就是競爭條件(race condition)(當兩個并發線程/進程都依賴于某個共享資源同時都嘗試去修改它的時候, 就會出現競爭條件),這會導致資源修改的結果狀態依賴于時間和執行順序。 這個問題,會導致整個程序行為變得不確定。
解決辦法: 始終避免所有全局的狀態.
?
4.4,創建Greenlets
gevent對Greenlet初始化提供了一些封裝.
import gevent from gevent import Greenlet def foo(message, n):gevent.sleep(n)print(message) thread1 = Greenlet.spawn(foo, "Hello", 1) thread2 = gevent.spawn(foo, "I live!", 2) thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] gevent.joinall(threads)執行結果:
Hello I live!除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的_run方法。
import gevent from gevent import Greenlet class MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()執行結果
Hi there!4.5,Greenlet狀態
greenlet的狀態通常是一個依賴于時間的參數:
- started – Boolean, 指示此Greenlet是否已經啟動
- ready() – Boolean, 指示此Greenlet是否已經停止
- successful() – Boolean, 指示此Greenlet是否已經停止而且沒拋異常
- value – 任意值, 此Greenlet代碼返回的值
- exception – 異常, 此Greenlet內拋出的未捕獲異常
代碼示例:
?
執行結果
True True You failed. win game None True True True False Traceback (most recent call last):File "src/gevent/greenlet.py", line 716, in gevent._greenlet.Greenlet.runFile "coroutine.py", line 121, in failraise Exception('You failed.') Exception: You failed. 2019-01-22T09:05:05Z <Greenlet "Greenlet-0" at 0x103d02848: fail> failed with Exception4.6,程序停止
當主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執行。這導致了所謂的僵尸進程, 它需要在Python解釋器之外被kill掉。
通用的處理模式就是在主程序中監聽SIGQUIT信號,調用gevent.shutdown退出程序。
?
4.7,超時
通過超時可以對代碼塊兒或一個Greenlet的運行時間進行約束。
import gevent from gevent import Timeout seconds = 3 timeout = Timeout(seconds) timeout.start()def wait():gevent.sleep(4)try:gevent.spawn(wait).join() except Timeout:print('Could not complete')執行結果:
Could not complete超時類
import gevent from gevent import Timeouttime_to_wait = 5class TimeLong(Exception):passwith Timeout(time_to_wait, TimeLong):gevent.sleep(6)4.8,猴子補丁(Monkey patching)
我們現在來到gevent的死角了. 在此之前,我已經避免提到猴子補丁(monkey patching) 以嘗試使gevent這個強大的協程模型變得生動有趣,但現在到了討論猴子補丁的黑色藝術 的時候了。你之前可能注意到我們提到了monkey.patch_socket()這個命令,這個 純粹副作用命令是用來改變標準socket庫的。
?
執行結果:
<class 'socket.socket'> After monkey patch <class 'gevent._socket3.socket'> <built-in function select> After monkey patch <function select at 0x1074631e0>Python的運行環境允許我們在運行時修改大部分的對象,包括模塊,類甚至函數。 這是個一般說來令人驚奇的壞主意,因為它創造了“隱式的副作用”,如果出現問題 它很多時候是極難調試的。雖然如此,在極端情況下當一個庫需要修改Python本身 的基礎行為的時候,猴子補丁就派上用場了。在這種情況下,gevent能夠修改標準庫里面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變為協作式運行。
例如,Redis的python綁定一般使用常規的tcp socket來與redis-server實例通信。 通過簡單地調用gevent.monkey.patch_all(),可以使得redis的綁定協作式的調度 請求,與gevent棧的其它部分一起工作。
這讓我們可以將一般不能與gevent共同工作的庫結合起來,而不用寫哪怕一行代碼。 雖然猴子補丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”。
?
參考文獻:
https://blog.csdn.net/xumesang/article/details/53288363
http://blog.chinaunix.net/uid-9162199-id-4738168.html
https://www.cnblogs.com/cwp-bg/p/9593405.html
?
?
?
總結
以上是生活随笔為你收集整理的Python并发之协程gevent基础的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 判断当前用户是否为root
- 下一篇: Linux Epoll ET模式EPOL