Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型...
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。
例子
#并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂from multiprocessing import Processimport os,timedef work():print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())?if __name__ == '__main__':for i in range(3):p=Process(target=work)p.start()
?
加鎖后
#加鎖后由并發變成了串行,犧牲了運行效率,但避免了競爭?from multiprocessing import Process,Lockimport os,timedef work(mutex):mutex.acquire() #開始加鎖print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())mutex.release() #釋放鎖,在加鎖期間別的進程都要等?if __name__ == '__main__':mutex = Lock()for i in range(3):p=Process(target=work,args=(mutex,))p.start()
?
例子2
多個進程共享同一文件
文件當數據庫,模擬搶票
?
未加鎖版
#文件db.txt的內容為:{"count":1}#注意一定要用雙引號,不然json無法識別?# 并發運行,效率高,但競爭寫同一文件,數據寫入錯亂from multiprocessing import Process,Lockimport time,json,random,osdef search():dic=json.load(open('db.txt'))print('\033[43m剩余票數%s\033[0m' %dic['count'])?def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('%s\033[43m購票成功\033[0m'%(os.getpid()))?def task(lock):search()get()if __name__ == '__main__':lock=Lock()for i in range(10): #模擬并發10個客戶端搶票p=Process(target=task,args=(lock,))p.start()
輸出結果
剩余票數1剩余票數1剩余票數1剩余票數1剩余票數1剩余票數1剩余票數1剩余票數1剩余票數1剩余票數14120購票成功2692購票成功7328購票成功13444購票成功13632購票成功13560購票成功13752購票成功12564購票成功13720購票成功13488購票成功
加鎖版
?
import multiprocessing,time,json,random?def search(name):with open("db.txt","r",encoding="utf-8") as f:data_dic = json.load(f)time.sleep(random.uniform(0,2))if data_dic["count"] >= 1 :print("已查詢到票還有%s張,當前系統時間 %s"%(data_dic["count"],time.asctime()))else:print("系統票源不足!當前系統時間 %s"%time.asctime())?def buy(name):with open("db.txt","r+",encoding="utf-8") as f:data_dic = json.load(f)if data_dic["count"] > 0 :with open("db.txt", "w", encoding="utf-8") as g:new_ticket_count = data_dic["count"] - 1data_dic.update({"count":new_ticket_count})json.dump(data_dic,g)print("%s購票成功!"%name)else:print("%s購票失敗!"%name)?def task(name,mutex):search(name) # 查詢無需加鎖mutex.acquire()buy(name) #針對修改文件的關鍵操作加鎖mutex.release()??if __name__ == "__main__":mutex = multiprocessing.Lock()for i in range(10):p = multiprocessing.Process(target=task,args=("乘客%s"%i,mutex))p.start()
分析
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。雖然可以用文件共享數據實現進程間通信,但問題是:1.效率低(共享數據基于文件,而文件是硬盤上的數據)2.需要自己加鎖處理???#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。1 隊列和管道都是將數據存放于內存中2 隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
?
二、信號量(multiprocess.Semaphore)
互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。實現:信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用于訪問像服務器這樣的有限資源。信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念。
例子
# 多進程中的組件# ktv# 4個# 一套資源 同一時間 只能被n個人訪問# 某一段代碼 同一時間 只能被n個進程執行import timeimport randomfrom multiprocessing import Processfrom multiprocessing import Semaphore?# sem = Semaphore(4)# sem.acquire()# print('拿到第一把鑰匙')# sem.acquire()# print('拿到第二把鑰匙')# sem.acquire()# print('拿到第三把鑰匙')# sem.acquire()# print('拿到第四把鑰匙')# sem.acquire()# print('拿到第五把鑰匙')def ktv(i,sem):sem.acquire() #獲取鑰匙print('%s走進ktv'%i)time.sleep(random.randint(1,5))print('%s走出ktv'%i)sem.release()??if __name__ == '__main__' :sem = Semaphore(4)for i in range(20):p = Process(target=ktv,args=(i,sem))p.start()
?
三、事件(multiprocess.Event)
python進程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
clear:將“Flag”設置為False,set:將“Flag”設置為True.
?
例子
?from multiprocessing import Event?e = Event()print(e.is_set()) #初始設置為Falseprint("數據111")e.set() #設置之后為Trueprint("數據222")print(e.is_set()) #打印設置之后的狀態e.wait() #當值為False會阻塞,當值為Ture是,不會阻塞print("數據333")e.clear() #清除事件狀態,設置為Falseprint(e.is_set()) #打印清除之后的狀態print("數據444")e.wait() #此時值為False,程序會一直阻塞print("數據555")
?
輸出結果
False數據111數據222True數據333False數據444
?
例子
簡單的紅綠燈事件
from multiprocessing import Event,Processimport timeimport random??def cars(e,num):if not e.is_set(): # 進程剛開啟,is_set()的值是False,模擬信號燈為紅色print("%s車正在等待通行"%num)e.wait() # 阻塞,等待信號燈切換print("%s車已經通過" % num) #打印已經通過的進程??def light(e):?#模擬定時切換紅綠燈while True:if e.is_set():e.clear() #>將is_set()的值設置為Falseprint("\033[31m紅燈亮了\033[0m")else:e.set() #>將is_set()的值設置為Trueprint("\033[32m綠燈亮了\033[0m")time.sleep(2)?if __name__ == "__main__":e = Event()traffic = Process(target=light,args=(e,))traffic.start() #啟動紅綠燈進程for i in range(20):car = Process(target=cars,args=(e,"布加迪%s"%i))car.start()time.sleep(random.random())
?
四、進程間通信——隊列和管道
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
隊列
隊列就相當于一個容器,里面可以放數據,特點是先放進去先拿出來,即先進先出。
創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
參數
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。q.get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.q.get_nowait():同q.get(False)q.put_nowait():同q.put(False)?q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
其他方法(了解):
q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,后臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。?q.cancel_join_thread() 不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞。?q.join_thread() 連接隊列的后臺線程。此方法用于在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
例子
from multiprocessing import Queue?q = Queue(3) # 創建一個隊列對象,并給他設置容器大小,即能放幾個數據q.put(1) # put()方法是往容器里放數據q.put([2,3])q.put({"k1":4})# q.put("mi") # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。try:q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。print('隊列已經滿了')?# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。print(q.full()) #返回True ,滿了print(q.get()) #get()方法是從容器里拿數據print(q.get())print(q.get())# 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。try:q.get_nowait() # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。print('隊列已經空了')?print(q.empty()) #空了
例子
import timefrom multiprocessing import Queue, Process??def task(q):q.put(" hello! 時間%s"%time.asctime()) # 調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。??if __name__ == '__main__':q = Queue(3)#創建一個Queue對象p = Process(target=task, args=(q,)) #創建一個子進程p.start()print(q.get()) #在主進程打印從子進程獲取的數據
? ?
?
生產者消費者模型
生產者消費者模型
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者消費者模型
生產者指的是生產數據的任務,消費者指的是處理數據的任務,在并發編程中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者和消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
?
基于隊列實現生產者消費者模型
from multiprocessing import Process, Queueimport time, random, os??def consumer(q):while True:res = q.get()time.sleep(random.randint(1, 3))print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))??def producer(q):for i in range(10):time.sleep(random.randint(1, 3))res = '包子%s' % iq.put(res)print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))??if __name__ == '__main__':q = Queue()# 生產者們:即廚師們p1 = Process(target=producer, args=(q,))?# 消費者們:即吃貨們c1 = Process(target=consumer, args=(q,))?# 開始p1.start()c1.start()print('主')
生產者消費者模型總結
#程序中有兩類角色一類負責生產數據(生產者)一類負責處理數據(消費者)#引入生產者消費者模型為了解決的問題是:平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度#如何實現:生產者<-->隊列<——>消費者#生產者消費者模型實現類程序的解耦和
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環
import time, random, osfrom multiprocessing import Process, Queue??def consumer(q):while True:res = q.get()if res is None: break # 收到結束信號則結束time.sleep(random.randint(1, 3))print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))??def producer(q):for i in range(10):time.sleep(random.randint(1, 3))res = '包子%s' % iq.put(res)print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))q.put(None) # 發送結束信號,生產者在生產完畢后發送結束信號None??if __name__ == '__main__':q = Queue()# 生產者們:即廚師們p1 = Process(target=producer, args=(q,))?# 消費者們:即吃貨們c1 = Process(target=consumer, args=(q,))?# 開始p1.start()c1.start()print('主')
?
注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號。但上述解決方式,在有多個生產者和多個消費者時,需要多次發送None信號。
import multiprocessingimport timeimport random??def producer(name, q):for i in range(2):res = "包子%s" % itime.sleep(random.randint(0, 1))print("%s生產了%s" % (name, res))q.put(res)??def consumer(name, q):while True:res = q.get()if q.get() is None: # 收到結束信號則結束print("沒包子吃了")breakprint("%s吃了%s" % (name, res))??if __name__ == "__main__":q = multiprocessing.Queue()p1 = multiprocessing.Process(target=producer, args=("jack", q))p2 = multiprocessing.Process(target=producer, args=("charles", q))p3 = multiprocessing.Process(target=producer, args=("pony", q))c1 = multiprocessing.Process(target=consumer, args=("nick", q))c2 = multiprocessing.Process(target=consumer, args=("nicholas", q))p_list = []p_list.append(p1)p_list.append(p2)p_list.append(p3)for p in p_list:p.start()c1.start()c2.start()p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號p2.join()p3.join()q.put(None) # 發送結束信號,有幾個消費者就應該發送幾次結束信號Noneq.put(None) # 發送結束信號print("end........")
這里有另外一種隊列提供了這種機制,JoinableQueue。
JoinableQueue([maxsize])
其實就是一種隊列,但又比隊列要多兩種方法,task_done()和join()方法,正是有這兩種方法就可以解決上面的問題。
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
?
方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:?q.task_done() 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。?q.join() 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,并等待它們被處理。
例子
import multiprocessing import time import randomdef producer(name, q):for i in range(2):res = "包子%s" % itime.sleep(random.randint(0, 1))print("%s生產了%s" % (name, res))q.put(res)q.join() # 只有顧客把隊列的包子全部拿走后,三個生產者進程才能全部結束def consumer(name, q):while True:res = q.get()print("%s吃了%s" % (name, res))q.task_done() # 發信號告訴隊列,又吃完了一個,從隊列中取走一個數據并處理完成if __name__ == "__main__":# q = multiprocessing.Queue()q = multiprocessing.JoinableQueue()p1 = multiprocessing.Process(target=producer, args=("jack", q))p2 = multiprocessing.Process(target=producer, args=("charles", q))p3 = multiprocessing.Process(target=producer, args=("pony", q))c1 = multiprocessing.Process(target=consumer, args=("nick", q))c2 = multiprocessing.Process(target=consumer, args=("nicholas", q))p_list = []p_list.append(p1)p_list.append(p2)p_list.append(p3)for p in p_list:p.start()c1.daemon = True # 將c1\c2設置成守護進程,只要主進程結束了,那么顧客就收到了所有的數據c2.daemon = Truec1.start()c2.start()p1.join()p2.join()p3.join()print("end........") # 主進程等--->p1,p2,p3等---->c1,c2 # p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據 # 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。 # 應該隨著主進程的結束而結束,所以設置成守護進程就可以了。
轉載于:https://www.cnblogs.com/Nicholas0707/p/10203587.html
總結
以上是生活随笔為你收集整理的Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 爬蜥学习之旅
- 下一篇: 聊聊flink的Tumbling Win