多进程进阶
一 multiprocessing模塊介紹
? python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
? ? multiprocessing模塊用來開啟子進程,并在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
?multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
? ? 需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限于該進程內。
?
二 Process類
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前還沒有實現,庫引用中提示必須是None;?
target: 要執行的方法;?
name: 進程名;?
args/kwargs: 要傳入方法的參數。
實例方法:
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程準備就緒,等待CPU調度
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():不管任務是否完成,立即停止工作進程
屬性:
daemon:和線程的setDeamon功能一樣
name:進程名字。
pid:進程號。
?
三 Process類的使用
創建并開啟子進程的兩種方式
import time import random from multiprocessing import Processdef foo(name):print('%s start' %name)time.sleep(random.randrange(1,6))print('%s end' %name)p1 = Process(target=foo,args=('Tom',)) #必須加 逗號 p2 = Process(target=foo,args=('Jerry',)) p3 = Process(target=foo,args=('Guido',))if __name__ == '__main__':p1.start()p2.start()p3.start()print('主線程') 方法一 import time import random from multiprocessing import Processclass Foo(Process):def __init__(self,name):super().__init__()self.name = namedef run(self):print('%s start' %self.name)time.sleep(random.randrange(1,6))print('%s ---> end' %self.name)p1 = Foo('Tom') p2 = Foo('Jerry') p3 = Foo('Guido')if __name__ == '__main__':p1.start() #start會自動調用run p2.start()p3.start()print('主線程') 方式二?
進程之間的內存空間是隔離的
from multiprocessing import Processn = 100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就可以了def work():global nn = 0print('子進程:',n)if __name__ == '__main__':p = Process(target=work)p.start()print('主進程:',n)#結果: 主進程: 100 子進程: 0 View Code?
Process對象的join方法
import time import random from multiprocessing import Processclass Foo(Process):def __init__(self,name):super().__init__()self.name = namedef run(self):print('%s start' %self.name)time.sleep(random.randrange(1,6))print('%s ---> end' %self.name)p = Foo('Tom')if __name__ == '__main__':p.start()p.join(0.0001) #等待p停止,等0.0001秒就不再等了print('開始') join:主進程等,等待子進程結束 import time import random from multiprocessing import Processdef foo(name):print('%s start' %name)time.sleep(random.randrange(1,6))print('%s ---> end' %name)p1 = Process(target=foo,args=('Tom',)) #必須加 逗號 p2 = Process(target=foo,args=('Jerry',)) p3 = Process(target=foo,args=('Guido',))if __name__ == '__main__':p1.start()p2.start()p3.start()# 疑問:既然join是等待進程結束,那么我像下面這樣寫,進程不就又變成串行的了嗎?# 當然不是了,必須明確:p.join()是讓誰等?# 很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,# 詳細解析如下:# 進程只要start就會在開始運行了,所以p1-p3.start()時,系統中已經有四個并發的進程了# 而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵# join是讓主線程等,而p1-p3仍然是并發執行的,p1.join的時候,其余p2,p3仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join直接通過檢測,無需等待# 所以3個join花費的總時間仍然是耗費時間最長的那個進程運行的時間 p1.join()p2.join()p3.join()print('主線程')#上述啟動進程與join進程可以簡寫為 p_l = [p1,p2,p3]for p in p_l:p.start()for p in p_l:p.join() 有了join,程序不就是串行了嗎??
注意:在windows中Process()必須放到# if __name__ == '__main__':下
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() insideif __name__ == "__main__" since statements inside this if-statement will not get called upon import. 由于Windows沒有fork,多處理模塊啟動一個新的Python進程并導入調用模塊。 如果在導入時調用Process(),那么這將啟動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。 解釋?
四 守護進程
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束后就終止 其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止?
import time import random from multiprocessing import Processclass Foo(Process):def __init__(self,name):super().__init__()self.name = namedef run(self):print('%s start' %self.name)time.sleep(random.randrange(1,6))print('%s ---> end' %self.name)p = Foo('Tom')if __name__ == '__main__':p.daemon = True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,并且父進程代碼執行結束,p即終止運行 p.start()print('主進程') View Code #主進程代碼運行完畢,守護進程就會結束 from multiprocessing import Process import time def foo():print(123)time.sleep(1)print("end123")def bar():print(456)time.sleep(3)print("end456")p1=Process(target=foo) p2=Process(target=bar)p1.daemon=True p1.start() p2.start() print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止 迷惑人的例子?
五 進程同步(鎖)
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
?
part1:多個進程共享同一打印終端
#并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 from multiprocessing import Process import os,time def work():print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())if __name__ == '__main__':for i in range(3):p=Process(target=work)p.start()#結果: 16924 is running 14620 is running 8640 is running 8640 is done 14620 is done 16924 is done 并發運行:效率高,但競爭統一打印終端,帶來了打印錯亂 #由并發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock):lock.acquire()print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())lock.release() if __name__ == '__main__':lock=Lock()for i in range(3):p=Process(target=work,args=(lock,))p.start()#結果 11496 is running 11496 is done 13344 is running 13344 is done 11792 is running 11792 is done 加鎖:由并發變成了串行,犧牲了運行效率,但避免了競爭part2:多個進程共享同一文件
文件當數據庫,模擬搶票
#文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()get() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start() 并發運行,效率高,但競爭同一文件,數據寫入錯亂 #文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()lock.acquire()get()lock.release() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start() 加鎖:購票行為由并發變成了串行,犧牲了運行效率,但保證了數據安全?
總結
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然可以用文件共享數據實現進程間通信,但問題是: 1.效率低(共享數據基于文件,而文件是硬盤上的數據) 2.需要自己加鎖處理#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊
為我們提供的基于消息的IPC通信機制:隊列和管道。 1 隊列和管道都是將數據存放于內存中 2 隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, 我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可
獲展性。
?
六 隊列(推薦使用)
進程彼此之間互相隔離,要實現進程間通信(IPC),mulitiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
創建隊列的類(底層就是以管道和鎖定的方式實現):
1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。?? ??參數介紹:
1 maxsize是隊列中允許最大項數,省略則無大小限制。?方法介紹:
主要方法: 1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue
已滿,會立即拋出Queue.Full異常。2 q.get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且
timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一
個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.3 q.get_nowait():同q.get(False) 4 q.put_nowait():同q.put(False)5 q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。6 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。7 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
其他方法(了解):
1 q.cancel_join_thread():不會在進程退出時自動連接后臺線程。可以防止join_thread()方法阻塞2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,后臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。3 q.join_thread():連接隊列的后臺線程。此方法用于在調用q.close()方法之后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為 View Code?
應用: from multiprocessing import Queue q = Queue(3)q.put(1) q.put(2) q.put(3) print(q.full()) #滿了print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了 View Code?
生產者消費者模型在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
?
基于隊列實現生產者消費者模型
import os import time import random from multiprocessing import Process,Queuedef consumer(q):while True:res = q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res = '包子%s' %iq.put(res)print('\033[42m%s 生產了 %s\033[0m' % (os.getpid(), res))if __name__ == '__main__':q = Queue()#生產者們:即廚師們p1 = Process(target=producer,args=(q,))#消費者們:即吃貨們c1 = Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('主線程') View Code?
#生產者消費者模型總結#程序中有兩類角色 一類負責生產數據(生產者)一類負責處理數據(消費者)#引入生產者消費者模型為了解決的問題是: 平衡生產者與消費者之間的速度差#如何實現:生產者 -》隊列 ——》消費者#生產者消費者模型實現類程序的解耦和?
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環
import os import time import random from multiprocessing import Process,Queuedef consumer(q):while True:res = q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res = '包子%s' %iq.put(res)print('\033[42m%s 生產了 %s\033[0m' % (os.getpid(), res))q.put(None) #發送結束信號if __name__ == '__main__':q = Queue()#生產者們:即廚師們p1 = Process(target=producer,args=(q,))#消費者們:即吃貨們c1 = Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('主線程') 生產者在生產完畢后發送結束信號None注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號
import os import time import random from multiprocessing import Process,Queuedef consumer(q):while True:res = q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res = '包子%s' %iq.put(res)print('\033[42m%s 生產了 %s\033[0m' % (os.getpid(), res))if __name__ == '__main__':q = Queue()#生產者們:即廚師們p1 = Process(target=producer,args=(q,))#消費者們:即吃貨們c1 = Process(target=consumer,args=(q,))#開始 p1.start()c1.start()p1.join()q.put(None) #發送結束信號print('主線程') 主進程在生產者生產完畢后發送結束信號None但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決
from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):for i in range(2):time.sleep(random.randint(1,3))res='%s%s' %(name,i)q.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產者們:即廚師們p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨頭',q))p3=Process(target=producer,args=('泔水',q))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))#開始 p1.start()p2.start()p3.start()c1.start()p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號 p2.join()p3.join()q.put(None) #有幾個消費者就應該發送幾次結束信號Noneq.put(None) #發送結束信號print('主') 有幾個消費者就需要發送幾次結束信號:相當low?
其實我們的思路無非是發送結束信號而已,有另外一種隊列提供了這種機制
#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。#參數介紹: maxsize是隊列中允許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數
量,將引發ValueError異常q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法
為止 import os import random import time from multiprocessing import Process,JoinableQueuedef consumer(q):while True:res=q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))q.task_done() #向q.join()發送一次信號,證明一個數據已經取走了def producer(name,q):for i in range(3):time.sleep(random.randint(1,3))res='%s%s' %(name,i)q.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))q.join()if __name__ == '__main__':q=JoinableQueue()#生產者們:即廚師們p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨頭',q))p3=Process(target=producer,args=('泔水',q))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))c1.daemon = Truec2.daemon = True#開始p_l = [p1,p2,p3,c1,c2]for p in p_l:p.start()p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號 p2.join()p3.join()print('主')# 主進程等--->p1,p2 p3等---->c1,c2# p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據# 因而c1,c2也沒有存在的價值了,應該隨著主進程的結束而結束,所以設置成守護進程 View Code
?
七 數據共享展望未來,基于消息傳遞的并發編程是大勢所趨
即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合
通過消息隊列交換數據。這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中
進程間通信應該盡量避免使用本節所講的共享數據的方式
? 進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止于此A manager object returned by Manager() controls a server process which holds Python objects and allows otherprocesses to manipulate them using proxies.A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore,
BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example, from multiprocessing import Manager,Process,Lockdef work(d,lock):with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂d['count']-=1if __name__ == '__main__':lock=Lock()with Manager() as m:dic=m.dict({'count':100})p_l=[]for i in range(100):p=Process(target=work,args=(dic,lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic) from multiprocessing import Manager,Process,Lockdef work(d,lock):with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂d['count']-=1if __name__ == '__main__':lock=Lock()with Manager() as m:dic=m.dict({'count':100})p_l=[]for i in range(100):p=Process(target=work,args=(dic,lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic) 進程之間操作共享數據
?
?
轉載于:https://www.cnblogs.com/zhaochangbo/p/7834343.html
總結
- 上一篇: 推荐几款好用实用的宝藏软件。记得收藏
- 下一篇: vsftp账号_Linux入门-Cent