python 多进程并发_python并发编程之多进程
一 multiprocessing模塊介紹
python中的多線程無法利用多核優(yōu)勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進(jìn)程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進(jìn)程,并在子進(jìn)程中執(zhí)行我們定制的任務(wù)(比如函數(shù)),該模塊與多線程模塊threading的編程接口類似。
multiprocessing模塊的功能眾多:支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
需要再次強(qiáng)調(diào)的一點(diǎn)是:與線程不同,進(jìn)程沒有任何共享狀態(tài),進(jìn)程修改的數(shù)據(jù),改動僅限于該進(jìn)程內(nèi)。
二 Process類的介紹
創(chuàng)建進(jìn)程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實(shí)例化得到的對象,表示一個(gè)子進(jìn)程中的任務(wù)(尚未啟動)
強(qiáng)調(diào):
1. 需要使用關(guān)鍵字的方式來指定參數(shù)
2. args指定的為傳給target函數(shù)的位置參數(shù),是一個(gè)元組形式,必須有逗號
參數(shù)介紹:
1 group參數(shù)未使用,值始終為None
2
3 target表示調(diào)用對象,即子進(jìn)程要執(zhí)行的任務(wù)
4
5 args表示調(diào)用對象的位置參數(shù)元組,args=(1,2,'egon',)
6
7 kwargs表示調(diào)用對象的字典,kwargs={'name':'egon','age':18}
8
9 name為子進(jìn)程的名稱
方法介紹:
1 p.start():啟動進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()
2 p.run():進(jìn)程啟動時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法
3
4 p.terminate():強(qiáng)制終止進(jìn)程p,不會進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會被釋放,進(jìn)而導(dǎo)致死鎖
5 p.is_alive():如果p仍然運(yùn)行,返回True
6
7 p.join([timeout]):主線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時(shí)時(shí)間,需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程
屬性介紹:
1 p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺運(yùn)行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置2
3 p.name:進(jìn)程的名稱4
5 p.pid:進(jìn)程的pid6
7 p.exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號N結(jié)束(了解即可)8
9 p.authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功(了解即可)
三 Process類的使用
注意:在windows中Process()必須放到# if __name__ == '__main__':下
詳細(xì)解釋
創(chuàng)建并開啟子進(jìn)程的兩種方式
方法一
方法二
進(jìn)程直接的內(nèi)存空間是隔離的
View Code
練習(xí)1:把上周所學(xué)的socket通信變成并發(fā)的形式
server端
多個(gè)client端
這么實(shí)現(xiàn)有沒有問題???
Process對象的join方法
join:主進(jìn)程等,等待子進(jìn)程結(jié)束
有了join,程序不就是串行了嗎???
Process對象的其他方法或?qū)傩?了解)
terminate與is_alive
name與pid
僵尸進(jìn)程與孤兒進(jìn)程(了解)
View Code
四 守護(hù)進(jìn)程
主進(jìn)程創(chuàng)建守護(hù)進(jìn)程
其一:守護(hù)進(jìn)程會在主進(jìn)程代碼執(zhí)行結(jié)束后就終止
其二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進(jìn)程之間是互相獨(dú)立的,主進(jìn)程代碼運(yùn)行結(jié)束,守護(hù)進(jìn)程隨即終止
View Code
迷惑人的例子
五 進(jìn)程同步(鎖)
進(jìn)程之間數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問同一個(gè)文件,或同一個(gè)打印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結(jié)果就是錯亂,如何控制,就是加鎖處理
part1:多個(gè)進(jìn)程共享同一打印終端
并發(fā)運(yùn)行,效率高,但競爭同一打印終端,帶來了打印錯亂
加鎖:由并發(fā)變成了串行,犧牲了運(yùn)行效率,但避免了競爭
part2:多個(gè)進(jìn)程共享同一文件
文件當(dāng)數(shù)據(jù)庫,模擬搶票
并發(fā)運(yùn)行,效率高,但競爭寫同一文件,數(shù)據(jù)寫入錯亂
加鎖:購票行為由并發(fā)變成了串行,犧牲了運(yùn)行效率,但保證了數(shù)據(jù)安全
總結(jié):
#加鎖可以保證多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)任務(wù)可以進(jìn)行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數(shù)據(jù)安全。
雖然可以用文件共享數(shù)據(jù)實(shí)現(xiàn)進(jìn)程間通信,但問題是:1.效率低(共享數(shù)據(jù)基于文件,而文件是硬盤上的數(shù)據(jù))2.需要自己加鎖處理#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個(gè)進(jìn)程共享一塊內(nèi)存的數(shù)據(jù))2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機(jī)制:隊(duì)列和管道。
1隊(duì)列和管道都是將數(shù)據(jù)存放于內(nèi)存中2 隊(duì)列又是基于(管道+鎖)實(shí)現(xiàn)的,可以讓我們從復(fù)雜的鎖問題中解脫出來,
我們應(yīng)該盡量避免使用共享數(shù)據(jù),盡可能使用消息傳遞和隊(duì)列,避免處理復(fù)雜的同步和鎖問題,而且在進(jìn)程數(shù)目增多時(shí),往往可以獲得更好的可獲展性。
六 隊(duì)列(推薦使用)
進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊支持兩種形式:隊(duì)列和管道,這兩種方式都是使用消息傳遞的
創(chuàng)建隊(duì)列的類(底層就是以管道和鎖定的方式實(shí)現(xiàn)):
1 Queue([maxsize]):創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
參數(shù)介紹:
1 maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。
方法介紹:
主要方法:
1 q.put方法用以插入數(shù)據(jù)到隊(duì)列中,put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
2 q.get方法可以從隊(duì)列讀取并且刪除一個(gè)元素。同樣,get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常.
3
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6
7 q.empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中又加入了項(xiàng)目。
8 q.full():調(diào)用此方法時(shí)q已滿則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中的項(xiàng)目被取走。
9 q.qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣
其他方法(了解):
1 q.cancel_join_thread():不會在進(jìn)程退出時(shí)自動連接后臺線程。可以防止join_thread()方法阻塞
2 q.close():關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法,后臺線程將繼續(xù)寫入那些已經(jīng)入隊(duì)列但尚未寫入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將調(diào)用此方法。關(guān)閉隊(duì)列不會在隊(duì)列使用者中產(chǎn)生任何類型的數(shù)據(jù)結(jié)束信號或異常。例如,如果某個(gè)使用者正在被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會導(dǎo)致get()方法返回錯誤。
3 q.join_thread():連接隊(duì)列的后臺線程。此方法用于在調(diào)用q.close()方法之后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread方法可以禁止這種行為
應(yīng)用:
View Code
生產(chǎn)者消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
View Code
#生產(chǎn)者消費(fèi)者模型總結(jié)
#程序中有兩類角色
一類負(fù)責(zé)生產(chǎn)數(shù)據(jù)(生產(chǎn)者)
一類負(fù)責(zé)處理數(shù)據(jù)(消費(fèi)者)#引入生產(chǎn)者消費(fèi)者模型為了解決的問題是:
平衡生產(chǎn)者與消費(fèi)者之間的速度差#如何實(shí)現(xiàn):
生產(chǎn)者-》隊(duì)列——》消費(fèi)者#生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)類程序的解耦和
此時(shí)的問題是主進(jìn)程永遠(yuǎn)不會結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費(fèi)者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊(duì)列中再發(fā)一個(gè)結(jié)束信號,這樣消費(fèi)者在接收到結(jié)束信號后就可以break出死循環(huán)
生產(chǎn)者在生產(chǎn)完畢后發(fā)送結(jié)束信號None
注意:結(jié)束信號None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號
主進(jìn)程在生產(chǎn)者生產(chǎn)完畢后發(fā)送結(jié)束信號None
但上述解決方式,在有多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者時(shí),我們則需要用一個(gè)很low的方式去解決
有幾個(gè)消費(fèi)者就需要發(fā)送幾次結(jié)束信號:相當(dāng)low
其實(shí)我們的思路無非是發(fā)送結(jié)束信號而已,有另外一種隊(duì)列提供了這種機(jī)制
#JoinableQueue([maxsize]):這就像是一個(gè)Queue對象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號和條件變量來實(shí)現(xiàn)的。
#參數(shù)介紹:
maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。#方法介紹:
JoinableQueue的實(shí)例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發(fā)出信號,表示q.get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常
q.join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止
View Code
七 管道
進(jìn)程間通信(IPC)方式二:管道(不推薦使用,了解即可)
介紹
基于管道實(shí)現(xiàn)進(jìn)程間通信(與隊(duì)列的方式是類似的,隊(duì)列就是管道加鎖實(shí)現(xiàn)的)
注意:生產(chǎn)者和消費(fèi)者都沒有使用管道的某個(gè)端點(diǎn),就應(yīng)該將其關(guān)閉,如在生產(chǎn)者中關(guān)閉管道的右端,在消費(fèi)者中關(guān)閉管道的左端。如果忘記執(zhí)行這些步驟,程序可能再消費(fèi)者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計(jì)數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生產(chǎn)EOFError異常。因此在生產(chǎn)者中關(guān)閉管道不會有任何效果,付費(fèi)消費(fèi)者中也關(guān)閉了相同的管道端點(diǎn)。
管道可以用于雙向通信,利用通常在客戶端/服務(wù)器中使用的請求/響應(yīng)模型或遠(yuǎn)程過程調(diào)用,就可以使用管道編寫與進(jìn)程交互的程序
八 共享數(shù)據(jù)
展望未來,基于消息傳遞的并發(fā)編程是大勢所趨
即便是使用線程,推薦做法也是將程序設(shè)計(jì)為大量獨(dú)立的線程集合
通過消息隊(duì)列交換數(shù)據(jù)。這樣極大地減少了對使用鎖定和其他同步手段的需求,
還可以擴(kuò)展到分布式系統(tǒng)中
進(jìn)程間通信應(yīng)該盡量避免使用本節(jié)所講的共享數(shù)據(jù)的方式
進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊(duì)列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的
雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過Manager實(shí)現(xiàn)數(shù)據(jù)共享,事實(shí)上Manager的功能遠(yuǎn)不止于此
A manager object returned by Manager() controls a server process which holds Python objectsandallows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Valueand Array. For example,
進(jìn)程之間操作共享的數(shù)據(jù)
九 信號量(了解)
信號量Semahpore(同線程一樣)
十 事件(了解)
Event(同線程一樣)
十一 進(jìn)程池
在利用Python進(jìn)行系統(tǒng)管理的時(shí)候,特別是同時(shí)操作多個(gè)文件目錄,或者遠(yuǎn)程控制多臺主機(jī),并行操作可以節(jié)約大量的時(shí)間。多進(jìn)程是實(shí)現(xiàn)并發(fā)的手段之一,需要注意的問題是:
很明顯需要并發(fā)執(zhí)行的任務(wù)通常要遠(yuǎn)大于核數(shù)
一個(gè)操作系統(tǒng)不可能無限開啟進(jìn)程,通常有幾個(gè)核就開幾個(gè)進(jìn)程
進(jìn)程開啟過多,效率反而會下降(開啟進(jìn)程是需要占用系統(tǒng)資源的,而且開啟多余核數(shù)目的進(jìn)程也無法做到并行)
例如當(dāng)被操作對象數(shù)目不大時(shí),可以直接利用multiprocessing中的Process動態(tài)成生多個(gè)進(jìn)程,十幾個(gè)還好,但如果是上百個(gè),上千個(gè)。。。手動的去限制進(jìn)程數(shù)量卻又太過繁瑣,此時(shí)可以發(fā)揮進(jìn)程池的功效。
我們就可以通過維護(hù)一個(gè)進(jìn)程池來控制進(jìn)程數(shù)目,比如httpd的進(jìn)程模式,規(guī)定最小進(jìn)程數(shù)和最大進(jìn)程數(shù)...
ps:對于遠(yuǎn)程過程調(diào)用的高級應(yīng)用程序而言,應(yīng)該使用進(jìn)程池,Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時(shí),如果池還沒有滿,那么就會創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。
創(chuàng)建進(jìn)程池的類:如果指定numprocess為3,則進(jìn)程池會從無到有創(chuàng)建三個(gè)進(jìn)程,然后自始至終使用這三個(gè)進(jìn)程去執(zhí)行所有任務(wù),不會開啟其他進(jìn)程
1 Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池
參數(shù)介紹:
1 numprocess:要創(chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值
2 initializer:是每個(gè)工作進(jìn)程啟動時(shí)要執(zhí)行的可調(diào)用對象,默認(rèn)為None
3 initargs:是要傳給initializer的參數(shù)組
方法介紹:
主要方法:
1 p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()2 p.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。3
4 p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成5 P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用
其他方法(了解部分)
方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法
obj.get():返回結(jié)果,如果有必要則等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒有到達(dá),將引發(fā)一場。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。
obj.ready():如果調(diào)用完成,返回True
obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常
obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/p>
obj.terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動調(diào)用此函數(shù)
View Code
應(yīng)用:
from multiprocessing importPoolimportos,timedefwork(n):print('%s run' %os.getpid())
time.sleep(3)return n**2
if __name__ == '__main__':
p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)
res_l=[]for i in range(10):
res=p.apply(work,args=(i,)) #同步調(diào)用,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞,但不管該任務(wù)是否存在阻塞,同步調(diào)用都會在原地等著,只是等的過程中若是任務(wù)發(fā)生了阻塞就會被奪走cpu的執(zhí)行權(quán)限
res_l.append(res)print(res_l)
同步調(diào)用apply
from multiprocessing importPoolimportos,timedefwork(n):print('%s run' %os.getpid())
time.sleep(3)return n**2
if __name__ == '__main__':
p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)
res_l=[]for i in range(10):
res=p.apply_async(work,args=(i,)) #同步運(yùn)行,阻塞、直到本次任務(wù)執(zhí)行完畢拿到res
res_l.append(res)#異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用jion,等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果,否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了
p.close()
p.join()for res inres_l:print(res.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
異步調(diào)用apply_async
#一:使用進(jìn)程池(異步調(diào)用,apply_async)#coding: utf-8
from multiprocessing importProcess,Poolimporttimedeffunc(msg):print( "msg:", msg)
time.sleep(1)returnmsgif __name__ == "__main__":
pool= Pool(processes = 3)
res_l=[]for i in range(10):
msg= "hello %d" %(i)
res=pool.apply_async(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會添加新的進(jìn)程進(jìn)去
res_l.append(res)print("==============================>") #沒有后面的join,或get,則程序整體結(jié)束,進(jìn)程池中的任務(wù)還沒來得及全部執(zhí)行完也都跟著主進(jìn)程一起結(jié)束了
pool.close()#關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成
pool.join() #調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束
print(res_l) #看到的是對象組成的列表,而非最終的結(jié)果,但這一步是在join后執(zhí)行的,證明結(jié)果已經(jīng)計(jì)算完畢,剩下的事情就是調(diào)用每個(gè)對象下的get方法去獲取結(jié)果
for i inres_l:print(i.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
#二:使用進(jìn)程池(同步調(diào)用,apply)#coding: utf-8
from multiprocessing importProcess,Poolimporttimedeffunc(msg):print( "msg:", msg)
time.sleep(0.1)returnmsgif __name__ == "__main__":
pool= Pool(processes = 3)
res_l=[]for i in range(10):
msg= "hello %d" %(i)
res=pool.apply(func, (msg, )) #維持執(zhí)行的進(jìn)程總數(shù)為processes,當(dāng)一個(gè)進(jìn)程執(zhí)行完畢后會添加新的進(jìn)程進(jìn)去
res_l.append(res) #同步執(zhí)行,即執(zhí)行完一個(gè)拿到結(jié)果,再去執(zhí)行另外一個(gè)
print("==============================>")
pool.close()
pool.join()#調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束
print(res_l) #看到的就是最終的結(jié)果組成的列表
for i in res_l: #apply是同步的,所以直接得到結(jié)果,沒有g(shù)et()方法
print(i)
詳解:apply_async與apply
練習(xí)2:使用進(jìn)程池維護(hù)固定數(shù)目的進(jìn)程(重寫練習(xí)1)
#Pool內(nèi)的進(jìn)程數(shù)默認(rèn)是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count())#開啟6個(gè)客戶端,會發(fā)現(xiàn)2個(gè)客戶端處于等待狀態(tài)#在每個(gè)進(jìn)程內(nèi)查看pid,會發(fā)現(xiàn)pid使用為4個(gè),即多個(gè)客戶端公用4個(gè)進(jìn)程
from socket import *
from multiprocessing importPoolimportos
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)deftalk(conn,client_addr):print('進(jìn)程pid: %s' %os.getpid())whileTrue:try:
msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())exceptException:break
if __name__ == '__main__':
p=Pool()whileTrue:
conn,client_addr=server.accept()
p.apply_async(talk,args=(conn,client_addr))#p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時(shí)間只有一個(gè)客戶端能訪問
server端
from socket import *client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))whileTrue:
msg=input('>>:').strip()if not msg:continueclient.send(msg.encode('utf-8'))
msg=client.recv(1024)print(msg.decode('utf-8'))
客戶端
發(fā)現(xiàn):并發(fā)開啟多個(gè)客戶端,服務(wù)端同一時(shí)間只有3個(gè)不同的pid,干掉一個(gè)客戶端,另外一個(gè)客戶端才會進(jìn)來,被3個(gè)進(jìn)程之一處理
回掉函數(shù):
需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個(gè)任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個(gè)函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)
我們可以把耗時(shí)間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時(shí)就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。
from multiprocessing importPoolimportrequestsimportjsonimportosdefget_page(url):print(' get %s' %(os.getpid(),url))
respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}defpasrse_page(res):print(' parse %s' %(os.getpid(),res['url']))
parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)if __name__ == '__main__':
urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']
p=Pool(3)
res_l=[]for url inurls:
res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
res_l.append(res)
p.close()
p.join()print([res.get() for res in res_l]) #拿到的是get_page的結(jié)果,其實(shí)完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了
'''打印結(jié)果:
get https://www.baidu.com
get https://www.python.org
get https://www.openstack.org
get https://help.github.com/
parse https://www.baidu.com
get http://www.sina.com.cn/
parse https://www.python.org
parse https://help.github.com/
parse http://www.sina.com.cn/
parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '\r\n...',...}]'''
View Code
from multiprocessing importPoolimporttime,randomimportrequestsimportredefget_page(url,pattern):
response=requests.get(url)if response.status_code == 200:return(response.text,pattern)defparse_page(info):
page_content,pattern=info
res=re.findall(pattern,page_content)for item inres:
dic={'index':item[0],'title':item[1],'actor':item[2].strip()[3:],'time':item[3][5:],'score':item[4]+item[5]
}print(dic)if __name__ == '__main__':
pattern1=re.compile(r'
.*?board-index.*?>(\d+)<.>(.*?)<.>(.*?)<.>(.*?)<.>(.*?)url_dic={'http://maoyan.com/board/7':pattern1,
}
p=Pool()
res_l=[]for url,pattern inurl_dic.items():
res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
res_l.append(res)for i inres_l:
i.get()#res=requests.get('http://maoyan.com/board/7')
#print(re.findall(pattern,res.text))
爬蟲案例
如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)
from multiprocessing importPoolimporttime,random,osdefwork(n):
time.sleep(1)return n**2
if __name__ == '__main__':
p=Pool()
res_l=[]for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)
p.close()
p.join()#等待進(jìn)程池中所有進(jìn)程執(zhí)行完畢
nums=[]for res inres_l:
nums.append(res.get())#拿到所有結(jié)果
print(nums) #主進(jìn)程拿到所有的處理結(jié)果,可以在主進(jìn)程中進(jìn)行統(tǒng)一進(jìn)行處理
View Code
總結(jié)
以上是生活随笔為你收集整理的python 多进程并发_python并发编程之多进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据结构实验:电话号码查询系统
- 下一篇: 主流的新闻APP 用的 推送SDK 记录