Python | threading03 - 使用条件对象,实现线程间的同步
文章目錄
- 一、前言
- 二、生產者-消費者的模型
- 2.1、代碼
- 2.2、運行
- 2.3、wait( )方法會將互斥鎖釋放
- 三、條件同步 - threading.Condition( )
- 3.1、相關API
- 3.2、acquire( )
- 3.2、release( )
- 3.3、wait( )
- 3.4、notify(n=1)
- 3.5、notify_all( )
- 3.6、wait_for(predicate,timeout=None)
- 四、with語句讓代碼更加簡潔
- 4.1、代碼
- 4.2、運行結果
一、前言
上一篇博文學習了線程的互斥鎖(Python | threading02 - 互斥鎖解決多個線程之間隨機調度,造成“線程不安全”的問題 ),線程的互斥鎖的目的是解決“線程不安全”的問題。值得注意的是互斥鎖解決了”線程不安全“問題,但產生了另外一些問題,如“死鎖”,“加鎖不合理導致程序的效率低下”等。
今天繼續學習python線程的另外一個工具 - 條件對象,條件對象用于實現線程間的同步。
官方文檔:
threading - Thread-based parallelism - Python 3.10.1 documentation
二、生產者-消費者的模型
2.1、代碼
代碼的目的:
- customer每隔2秒將item減少1(可以理解為消耗一個item)。當發現item=0時(即沒有item可以消耗時,趕緊通知producer生產item)。
- producer每隔1秒將item增加1。當item等于5時(可以理解為沒有地方存放item了),馬上停止生產item。
2.2、運行
運行的結果:
2.3、wait( )方法會將互斥鎖釋放
剛開始我就犯了一個錯誤,c.wait( )之后緊跟著c.release( )。根據《Python并行編程》的解釋,wait( )方法會將互斥鎖釋放回去。
三、條件同步 - threading.Condition( )
3.1、相關API
- acquire() — 線程鎖,注意線程條件變量Condition中的所有相關函數使用必須在acquire() /release() 內部操作;
- release() — 釋放鎖,注意線程條件變量Condition中的所有相關函數使用必須在acquire() /release() 內部操作;
- wait(timeout) — 線程掛起(阻塞狀態),直到收到一個notify通知或者超時才會被喚醒繼續運行(超時參數默認不設置,可選填,類型是浮點數,單位是秒)。wait()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError;
- notify(n=1) — 通知其他線程,那些掛起的線程接到這個通知之后會開始運行,缺省參數,默認是通知一個正等待通知的線程,最多則喚醒n個等待的線程。notify()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError,notify()不會主動釋放Lock;
- notifyAll() — 如果wait狀態線程比較多,notifyAll的作用就是通知所有線程;
- wait_for(predicate,timeout=None) — 這個實用方法會重復地調用wait( ),直到滿足判斷式或者發生超時。
3.2、acquire( )
從代碼可以看到,使用c.notify( )與c.wait( )之前需要先調用c.acquire( )獲取互斥鎖,否則會拋出異常。
3.2、release( )
沒什么好說的,就是釋放互斥鎖。
3.3、wait( )
讓調用wait( )的線程進入阻塞態,等待其他線程調用notify( )來進行同步,可以設置timeout。
3.4、notify(n=1)
讓其他調用wait( )的線程從阻塞態恢復至運行態,繼續運行后面的代碼。
3.5、notify_all( )
跟notify( )類似,notify_all( )不止喚醒一個線程,而是喚醒所有因為wait( )而進入阻塞態的線程。
3.6、wait_for(predicate,timeout=None)
這個wait_for相當于以下代碼(來自官方的解釋):
while not predicate():c.wait()使用代碼測試一下wait_for( )的用法。
# python3.9 import threading import timec = threading.Condition() # 聲明一個條件對象 item = 0 # 用于計數def customer_Wait_For():""""""global itemif item == 0:print("子線程%s發現item = %d,time = %s,將進入阻塞態" % (threading.current_thread().getName(),item,time.perf_counter()))print("通知producer恢復運行(解除阻塞態)")c.notify() # 通知producer線程解除阻塞態return False # 相當于運行c.wait()else:return True # 相當于不運行c.wait()def producer_Wait_For():""""""global itemif item == 5:print("子線程%s發現item = %d,time = %s,將進入阻塞態" % (threading.current_thread().getName(),item,time.perf_counter()))return False # 相當于運行c.wait()else:return True # 相當于不運行c.wait()def producer():"""子線程-生產者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(1) # 線程休眠1秒c.acquire() # 獲取鎖c.wait_for(producer_Wait_For) # 看看是否需要阻塞item += 1 # 每一秒增加1print("子線程%s將item增加1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.notify() # 通知customer解除阻塞態c.release() # 釋放鎖def customer():"""子線程-消費者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(2) # 線程休眠2秒c.acquire() #獲取鎖c.wait_for(customer_Wait_For) # 看看是否需要阻塞item -= 1print("子線程%s將item減少1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.release() # 釋放鎖def main():t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 創建producer子線程t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 創建customer子線程t1.start() # 啟動customer線程 t2.start() # 啟動producer線程t1.join() # 子線程customer是無限循環的線程,所以主線程需要等待它運行結束t2.join() # 子線程producer是無限循環的線程,所以主線程需要等待它運行結束print("主線程運行結束!")if __name__ == "__main__":main()從運行的結果看來,跟c.wait_for()只是將條件判斷(是否阻塞)封裝在一個函數里面而已,使得線程的函數變得更加簡潔。
四、with語句讓代碼更加簡潔
從官方文檔了解到,條件對象也支持with語句。
4.1、代碼
從下面的代碼可以看到,with語句幫我們管理了acquire( )方法與release( )方法(代碼里找不到acquire( )方法與release( )方法了。)
# python3.9 import threading import timec = threading.Condition() # 聲明一個條件對象 item = 0 # 用于計數def producer():"""子線程-生產者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(1) # 線程休眠1秒with c:# 如果item等于5,就進行阻塞if item == 5:print("子線程%s發現item = %d,time = %s,將進入阻塞態" % (threading.current_thread().getName(),item,time.perf_counter()))c.wait() # 進入阻塞態,并釋放鎖,,等待其他線程notifyitem += 1 # 每一秒增加1print("子線程%s將item增加1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.notify() # 通知customer解除阻塞態def customer():"""子線程-消費者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(2) # 線程休眠2秒with c:if item == 0:print("子線程%s發現item = %d,time = %s,將進入阻塞態" % (threading.current_thread().getName(),item,time.perf_counter()))print("通知producer恢復運行(解除阻塞態)")c.notify() # 通知producer線程解除阻塞態c.wait() # 進入阻塞態,并釋放鎖。等待其他線程notify item -= 1print("子線程%s將item減少1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))def main():t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 創建producer子線程t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 創建customer子線程t1.start() # 啟動customer線程 t2.start() # 啟動producer線程t1.join() # 子線程customer是無限循環的線程,所以主線程需要等待它運行結束t2.join() # 子線程producer是無限循環的線程,所以主線程需要等待它運行結束print("主線程運行結束!")if __name__ == "__main__":main()4.2、運行結果
總結
以上是生活随笔為你收集整理的Python | threading03 - 使用条件对象,实现线程间的同步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: f分布表完整图a=0.05_2019年0
- 下一篇: python连接impala_pytho