python之路-进程
理論知識(shí)
操作系統(tǒng)背景知識(shí)
顧名思義,進(jìn)程即程序正在執(zhí)行的一個(gè)過(guò)程,進(jìn)程是對(duì)正在運(yùn)行的程序的一個(gè)抽象.
進(jìn)程的概念起源于操作系統(tǒng),是操作系統(tǒng)最核心的概念,也是操作系統(tǒng)提供的最古老也是最總要的抽象概念之一.操作系統(tǒng)的其他所有內(nèi)容都是圍繞進(jìn)程概念展開(kāi)的.
所以想要真正了解進(jìn)程,必須事先了解操作系統(tǒng)
PS:即使可以利用cpu只有一個(gè)(早期的計(jì)算機(jī)確實(shí)如此),也能保證支持(偽)并發(fā)的能力,將一個(gè)單獨(dú)的cpu變成多個(gè)虛擬的cup(多道技術(shù):時(shí)間多路復(fù)用和空間多路復(fù)用+硬件上支持隔離),沒(méi)有進(jìn)程的抽象,現(xiàn)代計(jì)算機(jī)將不復(fù)存在.
必備的理論基礎(chǔ):
一 操作系統(tǒng)的作用:
1:隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口
2:管理,調(diào)度進(jìn)程,并且將多個(gè)進(jìn)程對(duì)硬件的競(jìng)爭(zhēng)變的有序
二 多道技術(shù):
1.產(chǎn)生背景:針對(duì)單核,實(shí)現(xiàn)開(kāi)發(fā)
ps:
現(xiàn)在的主機(jī)一般是多核,那么每個(gè)核都會(huì)利用多道技術(shù)
有4個(gè)cpu,運(yùn)行于cpu1的某個(gè)程序遇到io阻塞,會(huì)等到io結(jié)束再重新調(diào)度,會(huì)被調(diào)度到4個(gè)cpu中的任意一個(gè),具體由操作系統(tǒng)調(diào)度算法決定.
2.空間上的復(fù)用:如內(nèi)存中同時(shí)有多道程序
3.時(shí)間上的復(fù)用:復(fù)用一個(gè)cpu的時(shí)間片
強(qiáng)調(diào):遇到io切,占用cpu時(shí)間過(guò)長(zhǎng)也切,核心在于切之前將進(jìn)程的狀態(tài)保存下來(lái),這樣才能保證下次切換回來(lái)時(shí),能基于上次切走的位置繼續(xù)運(yùn)行
什么是進(jìn)程
進(jìn)程(Process)是計(jì)算機(jī)中程序關(guān)于某數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),是系統(tǒng)進(jìn)行資源分配好調(diào)度的基本單位,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ).在早期面向進(jìn)程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中,進(jìn)程是程序 基本執(zhí)行實(shí)體;在當(dāng)代面向線程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中,進(jìn)程是線程的容器,線程是程序的基本執(zhí)行實(shí)體.程序是指令,數(shù)據(jù)以及其組織形式的描述,進(jìn)程是程序的實(shí)體.
狹義的定義:進(jìn)程是正在運(yùn)行的程序的實(shí)例(an instance of a computer program that exected)
廣義定義:進(jìn)程是一個(gè)具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合的一次運(yùn)行活動(dòng)。它是操作系統(tǒng)動(dòng)態(tài)執(zhí)行的基本單元,在傳統(tǒng)的操作系統(tǒng)中,進(jìn)程既是基本的分配單元,也是基本的執(zhí)行單元。
進(jìn)程的概念
第一,進(jìn)程是一個(gè)實(shí)體。每一個(gè)進(jìn)程都有它自己的地址空間,一般情況下,包括文本區(qū)域(text region)、數(shù)據(jù)區(qū)域(data region)和堆棧(stack region)。文本區(qū)域存儲(chǔ)處理器執(zhí)行的代碼;數(shù)據(jù)區(qū)域存儲(chǔ)變量和進(jìn)程執(zhí)行期間使用的動(dòng)態(tài)分配的內(nèi)存;堆棧區(qū)域存儲(chǔ)著活動(dòng)過(guò)程調(diào)用的指令和本地變量。
第二,進(jìn)程是一個(gè)“執(zhí)行中的程序”。程序是一個(gè)沒(méi)有生命的實(shí)體,只有處理器賦予程序生命時(shí)(操作系統(tǒng)執(zhí)行之),它才能成為一個(gè)活動(dòng)的實(shí)體,我們稱(chēng)其為進(jìn)程。[3]
進(jìn)程是操作系統(tǒng)中最基本、重要的概念。是多道程序系統(tǒng)出現(xiàn)后,為了刻畫(huà)系統(tǒng)內(nèi)部出現(xiàn)的動(dòng)態(tài)情況,描述系統(tǒng)內(nèi)部各道程序的活動(dòng)規(guī)律引進(jìn)的一個(gè)概念,所有多道程序設(shè)計(jì)操作系統(tǒng)都建立在進(jìn)程的基礎(chǔ)上。
操作系統(tǒng)引入進(jìn)程的概念的原因
從理論角度看,是對(duì)正在運(yùn)行的程序過(guò)程的抽象;
從實(shí)現(xiàn)角度看,是一種數(shù)據(jù)結(jié)構(gòu),目的在于清晰地刻畫(huà)動(dòng)態(tài)系統(tǒng)的內(nèi)在規(guī)律,有效管理和調(diào)度進(jìn)入計(jì)算機(jī)系統(tǒng)主存儲(chǔ)器運(yùn)行的程序.
進(jìn)程的特征
動(dòng)態(tài)性:進(jìn)程的實(shí)質(zhì)是程序 在多道程序系統(tǒng)中的一次執(zhí)行過(guò)程,進(jìn)程是動(dòng)態(tài)產(chǎn)生,動(dòng)態(tài)消亡的.
并發(fā)性:任何進(jìn)程都可以同其他進(jìn)程一起并發(fā)執(zhí)行
獨(dú)立性:進(jìn)程是一個(gè)獨(dú)立運(yùn)行的基本單位,同時(shí)也是系統(tǒng)分配資源和調(diào)度的獨(dú)立單位;
異步性:由于進(jìn)程間的相互制約,使進(jìn)程具有執(zhí)行的間斷性,即進(jìn)程按各自獨(dú)立的,不可預(yù)知的速度向前推進(jìn)
結(jié)構(gòu)特征:進(jìn)程由程序,數(shù)據(jù)和進(jìn)程控制塊三部分組成.
多個(gè)不同的進(jìn)程可以包含相同的程序:一個(gè)程序在不同的數(shù)據(jù)集里就構(gòu)成不同的進(jìn)程,嫩得到不同的結(jié)果;但是執(zhí)行過(guò)程中,程序不能發(fā)生改變.
進(jìn)程與程序中的區(qū)別
程序是指令和數(shù)據(jù)的有序集合,其本身沒(méi)有任何運(yùn)行的含義,是一個(gè)靜態(tài)的概念.
而進(jìn)程是程序在處理機(jī)上的一次執(zhí)行過(guò)程,它是一個(gè)動(dòng)態(tài)的概念.
程序可以作為一種軟件資料長(zhǎng)期存在,而進(jìn)程是有一定生命期的.
程序是永久的,進(jìn)程是暫時(shí)的.
注意:同一個(gè)程序執(zhí)行兩次,就會(huì)在操作系統(tǒng)中出現(xiàn)兩個(gè)進(jìn)程,所以我們可以同時(shí)運(yùn)行一個(gè)軟件,分別做不同的事情也不會(huì)混亂.
進(jìn)程調(diào)度
想要多個(gè)進(jìn)程交替運(yùn)行,操作系統(tǒng)必須對(duì)這些進(jìn)程進(jìn)行調(diào)度,這個(gè)調(diào)度也不是隨機(jī)進(jìn)行的,而是需要遵循一定的原則,由此就有了進(jìn)程的調(diào)度算法.
先來(lái)先服務(wù)調(diào)度算法:
先來(lái)先服務(wù)(FCFS)調(diào)度算法是一種最簡(jiǎn)單的調(diào)度算法,該算法即可以用于作業(yè)調(diào)度,也可用于進(jìn)程調(diào)度,FCFS算法比較有利于長(zhǎng)作業(yè)(進(jìn)程),而不利于短作業(yè)(進(jìn)程).由此可知,本算法適合于CPU繁忙型作業(yè),而不利于I/O繁忙型的作業(yè)(進(jìn)程)
短作業(yè)優(yōu)先調(diào)度算法
短作業(yè)(進(jìn)程)優(yōu)先調(diào)度算法(SJ/PF)是指對(duì)短作業(yè)或者短進(jìn)程優(yōu)先調(diào)度的算法,該算法即可用于作業(yè)調(diào)度,也可用于進(jìn)程調(diào)度,但其對(duì)長(zhǎng)作業(yè)不利;不能保證緊迫性作業(yè)(進(jìn)程)被及時(shí)處理;作業(yè)長(zhǎng)短只是被估算出來(lái)的.
時(shí)間片輪轉(zhuǎn)法
時(shí)間片輪轉(zhuǎn)(Round Robin,RR)法的基本思路是讓每個(gè)進(jìn)程在就緒隊(duì)列中的等待時(shí)間與享受服務(wù)的時(shí)間成比例。在時(shí)間片輪轉(zhuǎn)法中,需要將CPU的處理時(shí)間分成固定大小的時(shí)間片,例如,幾十毫秒至幾百毫秒。如果一個(gè)進(jìn)程在被調(diào)度選中之后用完了系統(tǒng)規(guī)定的時(shí)間片,但又未完成要求的任務(wù),則它自行釋放自己所占有的CPU而排到就緒隊(duì)列的末尾,等待下一次調(diào)度。同時(shí),進(jìn)程調(diào)度程序又去調(diào)度當(dāng)前就緒隊(duì)列中的第一個(gè)進(jìn)程。
顯然,輪轉(zhuǎn)法只能用來(lái)調(diào)度分配一些可以搶占的資源。這些可以搶占的資源可以隨時(shí)被剝奪,而且可以將它們?cè)俜峙浣o別的進(jìn)程。CPU是可搶占資源的一種。但打印機(jī)等資源是不可搶占的。由于作業(yè)調(diào)度是對(duì)除了CPU之外的所有系統(tǒng)硬件資源的分配,其中包含有不可搶占資源,所以作業(yè)調(diào)度不使用輪轉(zhuǎn)法。
在輪轉(zhuǎn)法中,時(shí)間片長(zhǎng)度的選取非常重要。首先,時(shí)間片長(zhǎng)度的選擇會(huì)直接影響到系統(tǒng)的開(kāi)銷(xiāo)和響應(yīng)時(shí)間。如果時(shí)間片長(zhǎng)度過(guò)短,則調(diào)度程序搶占處理機(jī)的次數(shù)增多。這將使進(jìn)程上下文切換次數(shù)也大大增加,從而加重系統(tǒng)開(kāi)銷(xiāo)。反過(guò)來(lái),如果時(shí)間片長(zhǎng)度選擇過(guò)長(zhǎng),例如,一個(gè)時(shí)間片能保證就緒隊(duì)列中所需執(zhí)行時(shí)間最長(zhǎng)的進(jìn)程能執(zhí)行完畢,則輪轉(zhuǎn)法變成了先來(lái)先服務(wù)法。時(shí)間片長(zhǎng)度的選擇是根據(jù)系統(tǒng)對(duì)響應(yīng)時(shí)間的要求和就緒隊(duì)列中所允許最大的進(jìn)程數(shù)來(lái)確定的。
在輪轉(zhuǎn)法中,加入到就緒隊(duì)列的進(jìn)程有3種情況:
一種是分給它的時(shí)間片用完,但進(jìn)程還未完成,回到就緒隊(duì)列的末尾等待下次調(diào)度去繼續(xù)執(zhí)行。
另一種情況是分給該進(jìn)程的時(shí)間片并未用完,只是因?yàn)檎?qǐng)求I/O或由于進(jìn)程的互斥與同步關(guān)系而被阻塞。當(dāng)阻塞解除之后再回到就緒隊(duì)列。
第三種情況就是新創(chuàng)建進(jìn)程進(jìn)入就緒隊(duì)列。
如果對(duì)這些進(jìn)程區(qū)別對(duì)待,給予不同的優(yōu)先級(jí)和時(shí)間片從直觀上看,可以進(jìn)一步改善系統(tǒng)服務(wù)質(zhì)量和效率。例如,我們可把就緒隊(duì)列按照進(jìn)程到達(dá)就緒隊(duì)列的類(lèi)型和進(jìn)程被阻塞時(shí)的阻塞原因分成不同的就緒隊(duì)列,每個(gè)隊(duì)列按FCFS原則排列,各隊(duì)列之間的進(jìn)程享有不同的優(yōu)先級(jí),但同一隊(duì)列內(nèi)優(yōu)先級(jí)相同。這樣,當(dāng)一個(gè)進(jìn)程在執(zhí)行完它的時(shí)間片之后,或從睡眠中被喚醒以及被創(chuàng)建之后,將進(jìn)入不同的就緒隊(duì)列。
多級(jí)反饋隊(duì)列
前面介紹的各種用作進(jìn)程調(diào)度的算法都有一定的局限性,如短進(jìn)程優(yōu)先的調(diào)度算法,僅照顧了短進(jìn)程而忽略了長(zhǎng)進(jìn)程,而且如果并未指明進(jìn)程的長(zhǎng)度,則短進(jìn)程優(yōu)先和基于進(jìn)程長(zhǎng)度的搶占式調(diào)度算法都將無(wú)法使用.
而多級(jí)反饋隊(duì)列調(diào)度算法則不必事先知道各種進(jìn)程所需的執(zhí)行時(shí)間,而且還可以滿足各種類(lèi)型進(jìn)程的需要,因而它是目前被公認(rèn)的一種較好的進(jìn)程調(diào)度算法,在采用多級(jí)反饋隊(duì)列調(diào)度算法的系統(tǒng)中,掉度算法的實(shí)施過(guò)程如下所述.
(1)應(yīng)設(shè)置多個(gè)就緒隊(duì)列,并為各個(gè)隊(duì)列賦予不同的優(yōu)先級(jí),第一個(gè)隊(duì)列的優(yōu)先級(jí)最高,第二個(gè)隊(duì)列次之,其余各隊(duì)列的優(yōu)先權(quán)逐個(gè)降低.該算法賦予各個(gè)隊(duì)列中進(jìn)程執(zhí)行時(shí)間片的大小也各不相同,在優(yōu)先權(quán)愈高的隊(duì)列中,為每個(gè)進(jìn)程所規(guī)定的執(zhí)行時(shí)間片愈小.例如,第二個(gè)隊(duì)列的時(shí)間片要比第一個(gè)隊(duì)列的時(shí)間片長(zhǎng)一杯,......,第i+1個(gè)隊(duì)列的時(shí)間片要比第i個(gè)列隊(duì)的時(shí)間片長(zhǎng)一倍.
(2) 當(dāng)一個(gè)新進(jìn)程進(jìn)入內(nèi)存后,首先將它放入第一隊(duì)列的末尾,按FCFS原則排隊(duì)等待調(diào)度。當(dāng)輪到該進(jìn)程執(zhí)行時(shí),如它能在該時(shí)間片內(nèi)完成,便可準(zhǔn)備撤離系統(tǒng);如果它在一個(gè)時(shí)間片結(jié)束時(shí)尚未完成,調(diào)度程序便將該進(jìn)程轉(zhuǎn)入第二隊(duì)列的末尾,再同樣地按FCFS原則等待調(diào)度執(zhí)行;如果它在第二隊(duì)列中運(yùn)行一個(gè)時(shí)間片后仍未完成,再依次將它放入第三隊(duì)列,……,如此下去,當(dāng)一個(gè)長(zhǎng)作業(yè)(進(jìn)程)從第一隊(duì)列依次降到第n隊(duì)列后,在第n 隊(duì)列便采取按時(shí)間片輪轉(zhuǎn)的方式運(yùn)行。
(3) 僅當(dāng)?shù)谝魂?duì)列空閑時(shí),調(diào)度程序才調(diào)度第二隊(duì)列中的進(jìn)程運(yùn)行;僅當(dāng)?shù)?~(i-1)隊(duì)列均空時(shí),才會(huì)調(diào)度第i隊(duì)列中的進(jìn)程運(yùn)行。如果處理機(jī)正在第i隊(duì)列中為某進(jìn)程服務(wù)時(shí),又有新進(jìn)程進(jìn)入優(yōu)先權(quán)較高的隊(duì)列(第1~(i-1)中的任何一個(gè)隊(duì)列),則此時(shí)新進(jìn)程將搶占正在運(yùn)行進(jìn)程的處理機(jī),即由調(diào)度程序把正在運(yùn)行的進(jìn)程放回到第i隊(duì)列的末尾,把處理機(jī)分配給新到的高優(yōu)先權(quán)進(jìn)程
進(jìn)程的并行與并發(fā)
并行:并行是指兩者同時(shí)執(zhí)行,比如有兩條車(chē)道,在某一個(gè)時(shí)間點(diǎn),兩條車(chē)道上都有車(chē)在跑;(資源夠用,比如三個(gè)線程,四核的cpu)
并發(fā):并發(fā)是指資源有限的情況下,兩者交替輪流使用資源,比如只有一條車(chē)道(單核cpu資源),那么就是A車(chē)先走,在某個(gè)時(shí)刻A車(chē)退出把道路讓給B走,B走完了繼續(xù)給A,交替使用,目的是提高效率.
區(qū)別:
并行是從微觀上,也就是一個(gè)精確的時(shí)間片刻,有不同的程序在執(zhí)行,這就要求必須有多個(gè)處理器.
并發(fā)是從宏觀上,在一個(gè)時(shí)間段上可以看出是同時(shí)執(zhí)行的,比如一個(gè)服務(wù)器同時(shí)處理多個(gè)session.
注意:早期單核cpu時(shí)候,對(duì)于進(jìn)程也是微觀上串行,(站在cpu角度看),宏觀上并行(站在人的角度看就是同時(shí)有很多程序在執(zhí)行).
同步異步阻塞非阻塞
狀態(tài)介紹
在了解其他概念之前,我們首先了解進(jìn)程的幾個(gè)狀態(tài),在程序運(yùn)行的過(guò)程中,由于被操作系統(tǒng)的調(diào)度算法控制,程序會(huì)進(jìn)入幾個(gè)狀態(tài):就緒,運(yùn)行,阻塞.
(1)就緒(Ready)狀態(tài)
在進(jìn)程已經(jīng)分配到除cpu以外的所有必要的資源,只要獲得處理機(jī)便立即執(zhí)行,這時(shí)的進(jìn)程狀態(tài)稱(chēng)為就緒狀態(tài).
(2)執(zhí)行/運(yùn)行(Running)狀態(tài)當(dāng)進(jìn)程已獲得處理機(jī),其程序正在處理機(jī)上執(zhí)行,此時(shí)的進(jìn)程狀態(tài)稱(chēng)為執(zhí)行狀態(tài).
(3)阻塞(Blocked)狀態(tài)正在執(zhí)行的過(guò)程,由于等待某個(gè)事件發(fā)生而無(wú)法執(zhí)行時(shí),便放棄處理機(jī)而處于阻塞狀態(tài).引起進(jìn)程的時(shí)間可能有多種,例如,等待I/O完成,(input),申請(qǐng)緩沖區(qū)不能滿足,等待信件(信號(hào))等.
同步和異步
所謂同步就是一個(gè)任務(wù)的完成需要依賴另外一個(gè)任務(wù)時(shí),只有等待被依賴的任務(wù)完成后,依賴的任務(wù)才能算完成,這是一種可靠的任務(wù)序列。要么成功都成功,失敗都失敗,兩個(gè)任務(wù)的狀態(tài)可以保持一致。
所謂異步是不需要等待被依賴的任務(wù)完成,只是通知被依賴的任務(wù)要完成什么工作,依賴的任務(wù)也立即執(zhí)行,只要自己完成了整個(gè)任務(wù)就算完成了。至于被依賴的任務(wù)最終是否真正完成,依賴它的任務(wù)無(wú)法確定,所以它是不可靠的任務(wù)序列。
例如:
比如我去銀行辦理業(yè)務(wù),可能會(huì)有兩種方式:
第一種:選擇排隊(duì)等候;
第二種:選擇取一個(gè)小紙條上面有我的號(hào)碼,等到排到我這一號(hào)時(shí)由柜臺(tái)的人通知我輪到我去辦理業(yè)務(wù)了;
第一種:前者(排隊(duì)等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業(yè)務(wù)情況;
第二種:后者(等待別人通知)就是異步等待消息通知.在異步消息處理中,等待消息通知者(在這個(gè)例子中就是等待辦理業(yè)務(wù)的人)往往注冊(cè)一個(gè)回調(diào)機(jī)制,在所等待的時(shí)間被觸發(fā)時(shí)由觸發(fā)機(jī)制(在這里是柜臺(tái)的人)通過(guò)某種機(jī)制(在這里是寫(xiě)在小紙條上的號(hào)碼,喊號(hào))找到等待該事件的人
阻塞與非阻塞
阻塞和非阻塞這兩個(gè)概念與程序(線程)等待消息通知(無(wú)所謂同步或者異步)時(shí)的狀態(tài)有關(guān).也就是說(shuō)阻塞與非阻塞主要程序(線程)等待消息通知時(shí)的狀態(tài)角度來(lái)說(shuō)的
例子:
繼續(xù)上面的那個(gè)例子,不論是排隊(duì)還是使用號(hào)碼等待通知,如果在這個(gè)等待的過(guò)程中,等待者除了等待消息通知之外不能做其它的事情,那么該機(jī)制就是阻塞的,表現(xiàn)在程序中,也就是該程序一直阻塞在該函數(shù)調(diào)用處不能繼續(xù)往下執(zhí)行。
相反,有的人喜歡在銀行辦理這些業(yè)務(wù)的時(shí)候一邊打打電話發(fā)發(fā)短信一邊等待,這樣的狀態(tài)就是非阻塞的,因?yàn)樗?等待者)沒(méi)有阻塞在這個(gè)消息通知上,而是一邊做自己的事情一邊等待。
注意:同步非阻塞形式實(shí)際上是效率低下的,想象一下你一邊打著電話一邊還需要抬頭看到底隊(duì)伍排到你了沒(méi)有。如果把打電話和觀察排隊(duì)的位置看成是程序的兩個(gè)操作的話,這個(gè)程序需要在這兩種不同的行為之間來(lái)回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒(méi)有這樣的問(wèn)題,因?yàn)榇螂娫捠悄?等待者)的事情,而通知你則是柜臺(tái)(消息觸發(fā)機(jī)制)的事情,程序沒(méi)有在兩種不同的操作中來(lái)回切換。
同步/異步與阻塞/非阻塞
1.同步阻塞形式
效率最低,拿上面的例子來(lái)說(shuō),就是你專(zhuān)心排隊(duì),什么別的事都不做.
2.異步阻塞形式
如果在銀行等待辦理業(yè)務(wù)的人采用的是異步的方式去等待消息被觸發(fā)(通知),也就是領(lǐng)了一張小紙條,假如在這段時(shí)間里他不能離開(kāi)銀行做其它的事情,那么很顯然,這個(gè)人被阻塞在了這個(gè)等待的操作上面;
異步操作是可以被阻塞住的,只不過(guò)它不是在處理消息時(shí)阻塞,而是在等待消息通知時(shí)被阻塞。
實(shí)際上是效率低下的。
想象一下你一邊打著電話一邊還需要抬頭看到底隊(duì)伍排到你了沒(méi)有(兩個(gè)操作不能同時(shí)執(zhí)行,因?yàn)槭峭?#xff09;,如果把打電話和觀察排隊(duì)的位置看成是程序的兩個(gè)操作的話,這個(gè)程序需要在這兩種不同的行為之間來(lái)回的切換,效率可想而知是低下的。
效率更高,
因?yàn)榇螂娫捠悄?等待者)的事情,而通知你則是柜臺(tái)(消息觸發(fā)機(jī)制)的事情,程序沒(méi)有在兩種不同的操作中來(lái)回切換。
比如說(shuō),這個(gè)人突然發(fā)覺(jué)自己煙癮犯了,需要出去抽根煙,于是他告訴大堂經(jīng)理說(shuō),排到我這個(gè)號(hào)碼的時(shí)候麻煩到外面通知我一下,那么他就沒(méi)有被阻塞在這個(gè)等待的操作上面,自然這個(gè)就是異步+非阻塞的方式了。
很多人會(huì)把同步和阻塞混淆,是因?yàn)楹芏鄷r(shí)候同步操作會(huì)以阻塞的形式表現(xiàn)出來(lái),同樣的,很多人也會(huì)把異步和非阻塞混淆,因?yàn)楫惒讲僮饕话愣疾粫?huì)在真正的IO操作處被阻塞。
進(jìn)程的創(chuàng)建與結(jié)束
進(jìn)程的創(chuàng)建
但凡是硬件,都需要有操作系統(tǒng)去管理,只要有操作系統(tǒng),就有進(jìn)程的概念,就需要有創(chuàng)建進(jìn)程的方式,一些操作系統(tǒng)只為一個(gè)應(yīng)用程序設(shè)計(jì),比如微波爐中的控制器,一旦啟動(dòng)微波爐,所有的進(jìn)程都已經(jīng)存在。
而對(duì)于通用系統(tǒng)(跑很多應(yīng)用程序),需要有系統(tǒng)運(yùn)行過(guò)程中創(chuàng)建或撤銷(xiāo)進(jìn)程的能力,主要分為4中形式創(chuàng)建新的進(jìn)程:
1. 系統(tǒng)初始化(查看進(jìn)程linux中用ps命令,windows中用任務(wù)管理器,前臺(tái)進(jìn)程負(fù)責(zé)與用戶交互,后臺(tái)運(yùn)行的進(jìn)程與用戶無(wú)關(guān),運(yùn)行在后臺(tái)并且只在需要時(shí)才喚醒的進(jìn)程,稱(chēng)為守護(hù)進(jìn)程,如電子郵件、web頁(yè)面、新聞、打印)
2. 一個(gè)進(jìn)程在運(yùn)行過(guò)程中開(kāi)啟了子進(jìn)程(如nginx開(kāi)啟多進(jìn)程,os.fork,subprocess.Popen等)
3. 用戶的交互式請(qǐng)求,而創(chuàng)建一個(gè)新進(jìn)程(如用戶雙擊暴風(fēng)影音)
4. 一個(gè)批處理作業(yè)的初始化(只在大型機(jī)的批處理系統(tǒng)中應(yīng)用)
無(wú)論哪一種,新進(jìn)程的創(chuàng)建都是由一個(gè)已經(jīng)存在的進(jìn)程執(zhí)行了一個(gè)用于創(chuàng)建進(jìn)程的系統(tǒng)調(diào)用而創(chuàng)建的。
1. 在UNIX中該系統(tǒng)調(diào)用是:fork,fork會(huì)創(chuàng)建一個(gè)與父進(jìn)程一模一樣的副本,二者有相同的存儲(chǔ)映像、同樣的環(huán)境字符串和同樣的打開(kāi)文件(在shell解釋器進(jìn)程中,執(zhí)行一個(gè)命令就會(huì)創(chuàng)建一個(gè)子進(jìn)程)2. 在windows中該系統(tǒng)調(diào)用是:CreateProcess,CreateProcess既處理進(jìn)程的創(chuàng)建,也負(fù)責(zé)把正確的程序裝入新進(jìn)程。關(guān)于創(chuàng)建子進(jìn)程,UNIX和windows1.相同的是:進(jìn)程創(chuàng)建后,父進(jìn)程和子進(jìn)程有各自不同的地址空間(多道技術(shù)要求物理層面實(shí)現(xiàn)進(jìn)程之間內(nèi)存的隔離),任何一個(gè)進(jìn)程的在其地址空間中的修改都不會(huì)影響到另外一個(gè)進(jìn)程。2.不同的是:在UNIX中,子進(jìn)程的初始地址空間是父進(jìn)程的一個(gè)副本,提示:子進(jìn)程和父進(jìn)程是可以有只讀的共享內(nèi)存區(qū)的。但是對(duì)于windows系統(tǒng)來(lái)說(shuō),從一開(kāi)始父進(jìn)程與子進(jìn)程的地址空間就是不同的。進(jìn)程的結(jié)束
1. 正常退出(自愿,如用戶點(diǎn)擊交互式頁(yè)面的叉號(hào),或程序執(zhí)行完畢調(diào)用發(fā)起系統(tǒng)調(diào)用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯(cuò)退出(自愿,python a.py中a.py不存在)
3. 嚴(yán)重錯(cuò)誤(非自愿,執(zhí)行非法指令,如引用不存在的內(nèi)存,1/0等,可以捕捉異常,try...except...)
4. 被其他進(jìn)程殺死(非自愿,如kill -9)
在python程序中的進(jìn)程操作
之前我們已經(jīng)了解了很多進(jìn)程相關(guān)的理論知識(shí),了解進(jìn)程是什么應(yīng)該不再困難了,剛剛我們已經(jīng)了解了,運(yùn)行中的程序就是一個(gè)進(jìn)程。所有的進(jìn)程都是通過(guò)它的父進(jìn)程來(lái)創(chuàng)建的。因此,運(yùn)行起來(lái)的python程序也是一個(gè)進(jìn)程,那么我們也可以在程序中再創(chuàng)建進(jìn)程。多個(gè)進(jìn)程可以實(shí)現(xiàn)并發(fā)效果,也就是說(shuō),當(dāng)我們的程序中存在多個(gè)進(jìn)程的時(shí)候,在某些時(shí)候,就會(huì)讓程序的執(zhí)行速度變快。以我們之前所學(xué)的知識(shí),并不能實(shí)現(xiàn)創(chuàng)建進(jìn)程這個(gè)功能,所以我們就需要借助python中強(qiáng)大的模塊。
?
multiprocessing模塊
?仔細(xì)說(shuō)來(lái),multiprocessing不是一個(gè)模塊而是python中一個(gè)操作、管理進(jìn)程的包。 之所以叫multi是取自multiple的多功能的意思,在這個(gè)包中幾乎包含了和進(jìn)程有關(guān)的所有子模塊。由于提供的子模塊非常多,為了方便大家歸類(lèi)記憶,我將這部分大致分為四個(gè)部分:創(chuàng)建進(jìn)程部分,進(jìn)程同步部分,進(jìn)程池部分,進(jìn)程之間數(shù)據(jù)共享。
multiprocessing.process模塊
process模塊介紹
process模塊是一個(gè)創(chuàng)建進(jìn)程的模塊,借助這個(gè)模塊,就可以完成進(jìn)程的創(chuàng)建。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類(lèi)實(shí)例化得到的對(duì)象,表示一個(gè)子進(jìn)程中的任務(wù)(尚未啟動(dòng))強(qiáng)調(diào): 1. 需要使用關(guān)鍵字的方式來(lái)指定參數(shù) 2. args指定的為傳給target函數(shù)的位置參數(shù),是一個(gè)元組形式,必須有逗號(hào)參數(shù)介紹: group參數(shù)未使用,值始終為None target表示調(diào)用對(duì)象,即子進(jìn)程要執(zhí)行的任務(wù) args表示調(diào)用對(duì)象的位置參數(shù)元組,args=(1,2,'egon',) kwargs表示調(diào)用對(duì)象的字典,kwargs={'name':'egon','age':18} name為子進(jìn)程的名稱(chēng) p.start():啟動(dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run() p.run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類(lèi)的類(lèi)中一定要實(shí)現(xiàn)該方法 p.terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放,進(jìn)而導(dǎo)致死鎖 p.is_alive():如果p仍然運(yùn)行,返回True p.join([timeout]):主線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時(shí)時(shí)間,需要強(qiáng)調(diào)的是,p.join只能join住start開(kāi)啟的進(jìn)程,而不能join住run開(kāi)啟的進(jìn)程 方法介紹 p.daemon:默認(rèn)值為False,如果設(shè)為T(mén)rue,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為T(mén)rue后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置 p.name:進(jìn)程的名稱(chēng) p.pid:進(jìn)程的pid p.exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束(了解即可) p.authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類(lèi)連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功(了解即可)屬性介紹 在Windows操作系統(tǒng)中由于沒(méi)有fork(linux操作系統(tǒng)中創(chuàng)建進(jìn)程的機(jī)制),在創(chuàng)建子進(jìn)程的時(shí)候會(huì)自動(dòng)import 啟動(dòng)它的這個(gè)文件,而在 import 的時(shí)候又執(zhí)行了整個(gè)文件。因此如果將process()
直接寫(xiě)在文件中就會(huì)無(wú)限遞歸創(chuàng)建子進(jìn)程報(bào)錯(cuò)。所以必須把創(chuàng)建子進(jìn)程的部分使用if __name__ ==‘__main__’
判斷保護(hù)起來(lái),import 的時(shí)候 ,就不會(huì)遞歸運(yùn)行了。
使用process模塊創(chuàng)建進(jìn)程
在一個(gè)python進(jìn)程中開(kāi)啟子進(jìn)程,start方法和并發(fā)的效果.
from multiprocessing import Process import timedef func():time.sleep(1)print('這里是son')if __name__ == '__main__':p = Process(target = func)p.start()time.sleep(2)print('這里是father') from multiprocessing import Process import timedef func(name):print('son 的名字是 %s '%name)time.sleep(5)print('這里是son')if __name__ == '__main__':p = Process(target = func,args=('Alex',))p.start()time.sleep(2)p.join()# 代碼執(zhí)行到這里,主進(jìn)程main會(huì)停止等待子進(jìn)程執(zhí)行完畢才繼續(xù)print('這里是father') from multiprocessing import Process import osdef func():print('我是子進(jìn)程,我的進(jìn)程id是%s,我爸爸的id是%s'%(os.getpid(),os.getppid()))if __name__ == '__main__':print('我是main爸爸,我的進(jìn)程id是%s' % os.getpid())for i in range(5):p = Process(target = func,args=())p.start() 查看子進(jìn)程和父進(jìn)程的id號(hào)進(jìn)階,多個(gè)進(jìn)程同時(shí)執(zhí)行(注意:子進(jìn)程的執(zhí)行順序并不受開(kāi)啟子進(jìn)程的順序影響)
from multiprocessing import Process import time def func(i):print('這里是第%s個(gè)子進(jìn)程'%(i))time.sleep(1)if __name__ == '__main__':print('這里是main爸爸')for i in range(5):p = Process(target = func,args=(i,))p.start() 多個(gè)進(jìn)程同時(shí)執(zhí)行 from multiprocessing import Process import time def func(i):print('這里是第%s個(gè)子進(jìn)程'%(i))time.sleep(1)if __name__ == '__main__':for i in range(5):p = Process(target = func,args=(i,))p.start()p.join()# main會(huì)停在這一句,等子進(jìn)程執(zhí)行完,再繼續(xù)走,也就是才再走下一次for循環(huán)print('這里是main爸爸') join在搞事情(1) from multiprocessing import Process import time def func(i):print('這里是第%s個(gè)子進(jìn)程'%(i))time.sleep(1)if __name__ == '__main__':p_l = []for i in range(5):p = Process(target = func,args=(i,))p.start()p_l.append(p)[i.join() for i in p_l]print('這里是main爸爸')?上邊是直接開(kāi)啟多進(jìn)程,接下來(lái)介紹一個(gè)高大上的方法 -- 繼承Process類(lèi)的方式
from multiprocessing import Process import os class MyProcess(Process):def __init__(self,name):super().__init__()self.name = namedef run(self):print('我是%s,我正在和蒼老師談人生,我的id是%s'%(self.name,os.getpid()))if __name__ == '__main__':p1 = MyProcess('WuSir')p2 = MyProcess('邱老板')p3 = MyProcess('金老板')p1.start()# 調(diào)用start方法,start方法內(nèi)自動(dòng)調(diào)用run方法p2.start()# p2.run()p3.start()p1.join()p2.join()# 注意,如果調(diào)用run方法,就不能再調(diào)用join方法p3.join()print('我是main爸爸')繼承那些事(Process) 繼承那些事(Process)?多進(jìn)程之間關(guān)于數(shù)據(jù)隔離的那些年那些事兒
from multiprocessing import Processdef func():global nn = 0print('子進(jìn)程內(nèi) n = %s'%n)if __name__ == '__main__':n = 100p = Process(target=func)p.start()print('主進(jìn)程內(nèi) n = %s'%n)守護(hù)進(jìn)程? ? ?
會(huì)隨著父進(jìn)程的結(jié)束而結(jié)束。
父進(jìn)程創(chuàng)建守護(hù)進(jìn)程
其一:守護(hù)進(jìn)程會(huì)在父進(jìn)程代碼執(zhí)行結(jié)束后就終止
其二:守護(hù)進(jìn)程內(nèi)無(wú)法再開(kāi)啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進(jìn)程之間是互相獨(dú)立的,父進(jìn)程代碼運(yùn)行結(jié)束,守護(hù)進(jìn)程隨即終止
from multiprocessing import Process import timedef func():print('子進(jìn)程開(kāi)始執(zhí)行')time.sleep(2)print('子進(jìn)程結(jié)束執(zhí)行')if __name__ == '__main__':print('主進(jìn)程開(kāi)始執(zhí)行')p = Process(target=func1,)p.daemon = True# 將p 設(shè)置為守護(hù)進(jìn)程,此代碼一定要在start之前設(shè)置。p.start()time.sleep(1)print('主進(jìn)程結(jié)束執(zhí)行') from multiprocessing import Process import timedef func2():print('子進(jìn)程2開(kāi)始執(zhí)行')time.sleep(2)print('子進(jìn)程2結(jié)束執(zhí)行')def func1():print('子進(jìn)程開(kāi)始執(zhí)行')time.sleep(2)print('子進(jìn)程結(jié)束執(zhí)行')if __name__ == '__main__':print('主進(jìn)程開(kāi)始執(zhí)行')p1 = Process(target=func1,)p2 = Process(target=func2)p1.daemon = True# 將p1 設(shè)置為守護(hù)進(jìn)程,此代碼一定要在start之前設(shè)置。p1.start()p2.start()time.sleep(1)# 此時(shí)p1 p2 和main 都已經(jīng)開(kāi)始執(zhí)行print('主進(jìn)程結(jié)束執(zhí)行')# 當(dāng)主進(jìn)程打印完這句話,代表主進(jìn)程結(jié)束,守護(hù)進(jìn)程p1肯定隨之結(jié)束# 但是p2 不是守護(hù)進(jìn)程,不會(huì)結(jié)束,所以此時(shí)程序(也就是主進(jìn)程)會(huì)等待p2結(jié)束之后才結(jié)束。socket tcp協(xié)議并發(fā)實(shí)現(xiàn)聊天??
from multiprocessing import Process import socket from socket import SOL_SOCKET,SO_REUSEADDR SERVER_ADDR = ('127.0.0.1',8080)sk = socket.socket() sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) sk.bind(SERVER_ADDR)sk.listen(5)def func(conn):while 1:try:msg_r = conn.recv(1024).decode('utf-8')print(msg_r)if not msg_r:breakconn.send(msg_r.upper().encode('utf-8'))except:breakif __name__ == '__main__':while 1:conn,addr = sk.accept()p = Process(target=func,args=(conn,))p.start()sk.close()socket-tcp協(xié)議并發(fā)實(shí)現(xiàn)_server import socketsk = socket.socket() sk.connect(('127.0.0.1',8080))while 1:msg_s = input('>>>')if not msg_s:continuesk.send(msg_s.encode('utf-8'))print(sk.recv(1024).decode('utf-8')) sk.close()?多進(jìn)程中其他方法?
from multiprocessing import Process import random import timeclass MyProcess(Process):def __init__(self,name):super(MyProcess, self).__init__()self.name = name# name是父類(lèi)Process中的屬性,這里相當(dāng)于給子進(jìn)程命名def run(self):print('%s 正在撩小姐姐'%self.name)time.sleep(random.randint(1,3))print('%s 還在撩小姐姐'%self.name)if __name__ == '__main__':p = MyProcess('Alex')p.start()time.sleep(0.1)p.terminate()# 將p進(jìn)程殺死的命令。 將任務(wù)提交給操作系統(tǒng),操作系統(tǒng)什么時(shí)候執(zhí)行不受用戶決定print(p.is_alive())# 判斷p進(jìn)程是否還存在time.sleep(1)print(p.is_alive())# 判斷p進(jìn)程是否還存在進(jìn)程的terminate和is_alive方法 from multiprocessing import Process import random import timeclass MyProcess(Process):def __init__(self,name):super(MyProcess, self).__init__()self.name = name# name是父類(lèi)Process中的屬性,這里相當(dāng)于給子進(jìn)程命名def run(self):print('%s 正在撩小姐姐'%self.name)time.sleep(random.randint(1,3))print('%s 還在撩小姐姐'%self.name)if __name__ == '__main__':p = MyProcess('Alex')p.start()print(p.name,p.pid)# 打印進(jìn)程名字,進(jìn)程id號(hào)進(jìn)程的name和pid屬性進(jìn)程同步(multiprocessing.Lock、multiprocessing.Semaphore、multiprocessing.Event)
? ? ? ?在計(jì)算機(jī)中,有一些硬件和軟件,例如處理器、打印機(jī)等,都屬于競(jìng)爭(zhēng)類(lèi)資源,當(dāng)有需求時(shí),很多進(jìn)程都要爭(zhēng)搶這些資源,而對(duì)于這類(lèi)資源,就屬于臨界資源。當(dāng)多進(jìn)程共同處理某一個(gè)數(shù)據(jù)時(shí),這個(gè)數(shù)據(jù)也就屬于一個(gè)臨界資源。操作系統(tǒng)對(duì)計(jì)算機(jī)內(nèi)各種資源都使其在競(jìng)爭(zhēng)中有序化,但是對(duì)于數(shù)據(jù)來(lái)說(shuō),尤其是用戶動(dòng)態(tài)產(chǎn)生的數(shù)據(jù),當(dāng)處理時(shí)就變成了臨界資源,所以我們作為程序猿來(lái)說(shuō),需要對(duì)臨界資源加以保護(hù),否則就會(huì)出現(xiàn)數(shù)據(jù)混亂現(xiàn)象。這是在提高程序效率的優(yōu)勢(shì)下,帶來(lái)的一個(gè)隱患。小伙伴們,加油吧!
鎖 —— multiprocessing.Lock??
通過(guò)剛剛的學(xué)習(xí),我們千方百計(jì)實(shí)現(xiàn)了程序的異步,讓多個(gè)任務(wù)可以同時(shí)在幾個(gè)進(jìn)程中并發(fā)處理,他們之間的運(yùn)行沒(méi)有順序(或者說(shuō)由操作系統(tǒng)調(diào)度決定他們的順序),一旦開(kāi)啟也不受我們控制。盡管并發(fā)編程讓我們能更加充分的利用IO資源,但是也給我們帶來(lái)了新的問(wèn)題。
當(dāng)多個(gè)進(jìn)程使用同一份數(shù)據(jù)資源的時(shí)候,就會(huì)引發(fā)數(shù)據(jù)安全或順序混亂問(wèn)題。
from multiprocessing import Process import random import timedef func(addr):print('我是%s'%addr)time.sleep(random.random())print('謝謝!')if __name__ == '__main__':l = ['四川的','湖南的','河南的','江蘇的']for addr in l:p = Process(target=func,args=(addr,))p.start()time.sleep(2)print('\n\n我選%s'%random.choice(l)) # 關(guān)于搶占輸出資源的事情,是指多進(jìn)程并發(fā)執(zhí)行時(shí),并不是一個(gè)進(jìn)程執(zhí)行完任務(wù)后其他進(jìn)程再執(zhí)行。 # 比如 此程序會(huì)輸出:我是四川的 我是河南的 我是江蘇的 謝謝!謝謝!我是湖南的 謝謝! 謝謝! # 而不是 : 我是四川的 謝謝! 我是河南的 謝謝! ...多進(jìn)程關(guān)于搶占輸出資源的事情 from multiprocessing import Process import random import time from multiprocessing import Lock def func(addr,lock):lock.acquire()print('我是%s'%addr)time.sleep(random.random())print('謝謝!')lock.release()if __name__ == '__main__':lock = Lock()l = ['四川的','湖南的','河南的','江蘇的']for addr in l:p = Process(target=func,args=(addr,lock))p.start()time.sleep(4)print('\n\n我選%s'%random.choice(l))? 上面這種情況,使用了加鎖的形式確保了程序的順序執(zhí)行,但是執(zhí)行又變成了串行,降低了效率,但是不得不說(shuō),它確保了數(shù)據(jù)的安全性。
? ? ? 下面舉例來(lái)說(shuō)鎖的重要性:模擬12306搶票問(wèn)題。模擬銀行賬戶的存取款問(wèn)題
# 注意,文件中存儲(chǔ)需要以{'c':1}這種形式,c的引號(hào)一定要帶 # 否則json識(shí)別不出來(lái) # 此代碼的效果,并發(fā)執(zhí)行,但是多進(jìn)程同時(shí)讀寫(xiě)同一個(gè)文件數(shù)據(jù),造成數(shù)據(jù)混亂from multiprocessing import Process,Lock import json import timedef check(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)print('第%s個(gè)人在查票,余票為%s' % (i, dic['c']))pay(i,l)def pay(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)time.sleep(0.5)# 模擬網(wǎng)絡(luò)延遲,當(dāng)購(gòu)買(mǎi)過(guò)程中也會(huì)有網(wǎng)絡(luò)延遲if dic['c']:print('第%s個(gè)人買(mǎi)到票了 '%i)dic['c'] -= 1else:print('第%s個(gè)人沒(méi)買(mǎi)到票'%i)with open('a.txt','w') as f:json.dump(dic,f)if __name__ == '__main__':l = Lock()for i in range(10):p = Process(target=check,args=(i+1,l))p.start()多個(gè)人同時(shí)搶票很明顯,上述例子中,因?yàn)槎噙M(jìn)程同時(shí)對(duì)一個(gè)臨界資源(a.txt文件)進(jìn)行了讀寫(xiě)操作,使文件內(nèi)數(shù)據(jù)混亂,也造成了余票為1張,但是很多人都搶到票的假象。那就加鎖來(lái)解決它吧
from multiprocessing import Process,Lock import json import timedef check(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)print('第%s個(gè)人在查票,余票為%s' % (i, dic['c']))l.acquire()pay(i,l)# 為什么在這里加鎖? 因?yàn)槊總€(gè)人都可以查票,讀取數(shù)據(jù),不會(huì)造成數(shù)據(jù)混亂,但是當(dāng)買(mǎi)票的時(shí)候,就需要對(duì)臨界資源的寫(xiě)入,所以對(duì)寫(xiě)操作加鎖,使某一個(gè)進(jìn)程在寫(xiě)文件時(shí)候,其他進(jìn)程不能碰此文件。l.release()def pay(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)time.sleep(0.5)# 模擬網(wǎng)絡(luò)延遲,當(dāng)購(gòu)買(mǎi)過(guò)程中也會(huì)有網(wǎng)絡(luò)延遲if dic['c']:print('第%s個(gè)人買(mǎi)到票了 '%i)dic['c'] -= 1else:print('第%s個(gè)人沒(méi)買(mǎi)到票'%i)with open('a.txt','w') as f:json.dump(dic,f)if __name__ == '__main__':l = Lock()for i in range(10):p = Process(target=check,args=(i+1,l))p.start()加鎖解決買(mǎi)票問(wèn)題?關(guān)于銀行存取款的問(wèn)題。同一個(gè)賬戶,某個(gè)人一直存,某個(gè)人在同一時(shí)間一直取,如果不對(duì)數(shù)據(jù)進(jìn)行保護(hù)起來(lái),就會(huì)造成的一種數(shù)據(jù)混亂問(wèn)題。
from multiprocessing import Process, Lock,Valuedef save_money(num):for i in range(100):time.sleep(0.05)num.value += 1def draw_money(num):for i in range(100):time.sleep(0.05)num.value -= 1if __name__ == '__main__':num = Value('i',1000)# 多進(jìn)程中共享數(shù)據(jù),一個(gè)int類(lèi)型的數(shù)據(jù),1000man = Process(target=save_money,args=(num,))woman = Process(target=draw_money,args=(num,))man.start()woman.start()time.sleep(6)print(num.value)錢(qián)多錢(qián)少怪誰(shuí)? from multiprocessing import Process, Lock,Valuedef save_money(num,l):for i in range(100):time.sleep(0.05)l.acquire()num.value += 1l.release()def draw_money(num,l):for i in range(100):time.sleep(0.05)l.acquire()# 在操作存取款的數(shù)據(jù)時(shí),先將數(shù)據(jù)鎖住,不允許其他人更改此數(shù)據(jù)num.value -= 1l.release()if __name__ == '__main__':l = Lock()num = Value('i',1000)# 多進(jìn)程中共享數(shù)據(jù),一個(gè)int類(lèi)型的數(shù)據(jù),1000man = Process(target=save_money,args=(num,l))woman = Process(target=draw_money,args=(num,l))man.start()woman.start()time.sleep(6)print(num.value)這樣才對(duì)!!!信號(hào)量 —— multiprocessing.Semaphore(了解)
# 上述講的Lock,屬于互斥鎖,也就是一把鑰匙配備一把鎖,同時(shí)只允許鎖住某一個(gè)數(shù)據(jù)。而信號(hào)量則是多把鑰匙配備多把鎖,也就是說(shuō)同時(shí)允許鎖住多個(gè)數(shù)據(jù)。比如在一個(gè)粉紅發(fā)廊,里邊有5位服務(wù)人員,那么這個(gè)發(fā)廊最多就同時(shí)允許進(jìn)入5位客人,當(dāng)又有第6位客人來(lái)的時(shí)候,就需要在門(mén)外等待;當(dāng)服務(wù)人員服務(wù)完某位客人后,才允許后續(xù)的人再進(jìn)來(lái)一個(gè),換句話說(shuō),這個(gè)發(fā)廊最多同時(shí)接待5位客人,多的客人必須等待。信號(hào)量同步基于內(nèi)部計(jì)數(shù)器,用戶初始化一個(gè)計(jì)數(shù)器初值(比如上述例子中就初始化為5),每調(diào)用一次acquire(),計(jì)數(shù)器減1;每調(diào)用一次release(),計(jì)數(shù)器加1。當(dāng)計(jì)數(shù)器為0時(shí),acquire()調(diào)用被阻塞。這是迪科斯徹(Dijkstra)信號(hào)量概念P()和V()的Python實(shí)現(xiàn)。信號(hào)量同步機(jī)制適用于訪問(wèn)像服務(wù)器這樣的有限資源。信號(hào)量與進(jìn)程池的概念很像,但是要區(qū)分開(kāi),信號(hào)量涉及到加鎖的概念 信號(hào)量 Semaphore from multiprocessing import Semaphore from multiprocessing import Process import time import randomdef sing(i,se):se.acquire()# 每次進(jìn)來(lái)一位客人,信號(hào)量?jī)?nèi)部計(jì)數(shù)器減1print('%s進(jìn)入小黑屋'%i)time.sleep(random.randint(1,3))print('%s交錢(qián)走人'%i)se.release()# 每次離開(kāi)一位客人,信號(hào)量?jī)?nèi)部計(jì)數(shù)器加1if __name__ == '__main__':se = Semaphore(5)# 初始化5把鑰匙配備5把鎖for i in range(10): # 模擬10個(gè)人要進(jìn)入小黑屋子p = Process(target=sing,args=(i,se))p.start()信號(hào)量機(jī)制舉個(gè)栗子事件 —— multiprocessing.Event(了解)?
python中的事件機(jī)制,主要用于主進(jìn)程控制其他進(jìn)程的執(zhí)行,事件主要提供了三個(gè)方法 set、wait、clear。事件處理的機(jī)制:全局定義了一個(gè)“Flag”(event.is_set()),如果“Flag”值為 False,那么當(dāng)程序執(zhí)行 event.wait 方法時(shí)就會(huì)阻塞,如果“Flag”值為T(mén)rue,那么event.wait 方法時(shí)便不再阻塞。clear:將“Flag”設(shè)置為False set:將“Flag”設(shè)置為T(mén)rue is_set:返回全局‘Flag’的bool值事件 Event def traffic_lights(e):while 1:if e.is_set():# 如果True,代表e.wait() 不阻塞time.sleep(5)# 這段時(shí)間代表綠燈亮的時(shí)間,可以過(guò)車(chē)print('\033[31m紅燈亮!\033[0m')# 該紅燈亮了e.clear()# 將e.is_set()改成False,else:# 執(zhí)行car函數(shù)的進(jìn)程們,讀e.wait,因?yàn)樯线呉呀?jīng)是False,所以e.wait將會(huì)阻塞住time.sleep(5)# 紅燈亮的期間print('\033[32m綠燈亮!\033[0m')# 該綠燈亮了e.set()# 將e.is_set()設(shè)置成True,此時(shí)e.wait()將不再阻塞,車(chē)可以過(guò)def car(i,e):e.wait()# 讀取是否阻塞,如果阻塞,代表在print('第%s輛車(chē)過(guò)'%i)if __name__ == '__main__':e = Event()triff_light = Process(target=traffic_lights,args=(e,))triff_light.start()for i in range(50):cars = Process(target=car,args=(i,e))cars.start()時(shí)間機(jī)制舉個(gè)栗子進(jìn)程間通信——隊(duì)列和管道(multiprocess.Queue、multiprocess.Pipe)
?進(jìn)程間通信--IPC(Inter-Process Communication)
IPC的方法:此處介紹隊(duì)列和管道
隊(duì)列 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
概念:創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
Queue([maxsize]) 創(chuàng)建共享的進(jìn)程隊(duì)列。 參數(shù) :maxsize是隊(duì)列中允許存在的最大元素個(gè)數(shù)。如果省略此參數(shù),則無(wú)大小限制。 底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。 Queue([maxsize]) q = Queue([maxsize])q.get( [ block [ ,timeout ] ] ) 返回q中的一個(gè)項(xiàng)目。如果q為空,此方法將阻塞,直到隊(duì)列中有項(xiàng)目可用為止。block用于控制阻塞行為,默認(rèn)為T(mén)rue. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時(shí)時(shí)間,用在阻塞模式中。如果在指定的時(shí)間間隔內(nèi)沒(méi)有項(xiàng)目可用,將引發(fā)Queue.Empty異常。q.get_nowait( ) 同q.get(False)方法。q.put(item [, block [,timeout ] ] ) 將item放入隊(duì)列。如果隊(duì)列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認(rèn)為T(mén)rue。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫(kù)模塊中)。timeout指定在阻塞模式中等待可用空間的時(shí)間長(zhǎng)短。超時(shí)后將引發(fā)Queue.Full異常。q.qsize() 返回隊(duì)列中目前項(xiàng)目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠,因?yàn)樵诜祷亟Y(jié)果和在稍后程序中使用結(jié)果之間,隊(duì)列中可能添加或刪除了項(xiàng)目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。q.empty() 如果調(diào)用此方法時(shí) q為空,返回True。如果其他進(jìn)程或線程正在往隊(duì)列中添加項(xiàng)目,結(jié)果是不可靠的。也就是說(shuō),在返回和使用結(jié)果之間,隊(duì)列中可能已經(jīng)加入新的項(xiàng)目。q.full() 如果q已滿,返回為T(mén)rue. 由于線程的存在,結(jié)果也可能是不可靠的(參考q.empty()方法)。。方法介紹Queue中的方法介紹 q.close() 關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法時(shí),后臺(tái)線程將繼續(xù)寫(xiě)入那些已入隊(duì)列但尚未寫(xiě)入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將自動(dòng)調(diào)用此方法。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中生成任何類(lèi)型的數(shù)據(jù)結(jié)束信號(hào)或異常。例如,如果某個(gè)使用者正被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致get()方法返回錯(cuò)誤。q.cancel_join_thread() 不會(huì)再進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)線程。這可以防止join_thread()方法阻塞。q.join_thread() 連接隊(duì)列的后臺(tái)線程。此方法用于在調(diào)用q.close()方法后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread()方法可以禁止這種行為。其他方法(了解)需要你了解的幾個(gè)方法?代碼示例??
from multiprocessing import Queueq = Queue(4) q.put(1) q.put(2) q.put(3) q.put(4) # q.put(5) #如果是put,因?yàn)橐呀?jīng)放滿數(shù)據(jù),所以程序會(huì)阻塞在put,等待取出數(shù)據(jù) # q.put_nowait(5) # 如果是put_nowait() ,不會(huì)阻塞,直接放入隊(duì)列數(shù)據(jù),隊(duì)列滿則報(bào)異常 try:q.put_nowait(5)# 在此處用try直接處理異常。此時(shí)數(shù)據(jù)不會(huì)放入到隊(duì)列,會(huì)被直接丟棄 except:print('隊(duì)列已滿')print(q.get()) print(q.get()) print(q.get()) print(q.get()) # print(q.get())# 此處和上邊一樣,因?yàn)殛?duì)列中已空,所以程序會(huì)阻塞在get,等待放入數(shù)據(jù) # q.get_nowait() # 不會(huì)阻塞,直接從隊(duì)列中獲取數(shù)據(jù),獲取不到則報(bào)錯(cuò) try:q.get_nowait()# 在此處用try處理錯(cuò)誤,此時(shí)獲取不到數(shù)據(jù),直接跳過(guò) except:print('隊(duì)列已空')了解隊(duì)列的用法 了解隊(duì)列的用法?上面這個(gè)例子還沒(méi)有加入進(jìn)程通信,只是先來(lái)看看隊(duì)列為我們提供的方法,以及這些方法的使用和現(xiàn)象。
from multiprocessing import Queue,Process import time def func(q):# time.sleep(1)q.put('我是四川的')if __name__ == '__main__':q = Queue(5)p = Process(target=func,args=(q,))p.start()#print(q.get_nowait())# 此處,可能會(huì)報(bào)錯(cuò),因?yàn)樽舆M(jìn)程和父進(jìn)程同時(shí)運(yùn)行,不一定隊(duì)列中有數(shù)據(jù)print(q.get())# 此處一定不會(huì)報(bào)錯(cuò),因?yàn)間et是阻塞獲取數(shù)據(jù),如果隊(duì)列沒(méi)有就等著多進(jìn)程中使用隊(duì)列 多進(jìn)程中使用隊(duì)列上面是一個(gè)queue的簡(jiǎn)單應(yīng)用,使用隊(duì)列q對(duì)象調(diào)用get函數(shù)來(lái)取得隊(duì)列中最先進(jìn)入的數(shù)據(jù)。 接下來(lái)看一個(gè)稍微復(fù)雜一些的例子:
from multiprocessing import Process, Queue,freeze_support import random import osdef put_func(q):info = str(os.getpid()) + '\t:\t' + str(random.randint(0, 100))q.put(info)def get_func(q):print('%s 獲取到數(shù)據(jù) :\033[33m; %s \033[0m' % (os.getpid(), q.get()))if __name__ == '__main__':# freeze_support() 如果有windows系統(tǒng)開(kāi)啟多進(jìn)程導(dǎo)致程序崩潰,可嘗試調(diào)用此函數(shù)q = Queue(5)l_put = []l_get = []for i in range(10):p_put = Process(target=put_func, args=(q,))p_put.start()l_put.append(p_put)for i in range(10):p_get = Process(target=get_func, args=(q,))p_get.start()l_put.append(p_get)# [i.join() for i in l_put]# [i.join() for i in l_get]批量數(shù)據(jù)放入隊(duì)列并批量獲取 批量數(shù)據(jù)放入隊(duì)列并批量獲取生產(chǎn)者消費(fèi)者模型 ? ??
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問(wèn)題。該模式通過(guò)平衡生產(chǎn)進(jìn)程和消費(fèi)進(jìn)程的工作能力來(lái)提高程序的整體處理數(shù)據(jù)的速度。
舉個(gè)應(yīng)用栗子:全棧開(kāi)發(fā)時(shí)候,前端接收客戶請(qǐng)求,后端處理請(qǐng)求邏輯。當(dāng)某時(shí)刻客戶請(qǐng)求過(guò)于多的時(shí)候,后端處理不過(guò)來(lái),此時(shí)完全可以借助隊(duì)列來(lái)輔助,將客戶請(qǐng)求放入隊(duì)列中,后端邏輯代碼處理完一批客戶請(qǐng)求后馬上從隊(duì)列中繼續(xù)獲取,這樣平衡兩端的效率。
為什么要使用生產(chǎn)者和消費(fèi)者模式
在進(jìn)程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的進(jìn)程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的進(jìn)程。在多進(jìn)程開(kāi)發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問(wèn)題于是引入了生產(chǎn)者和消費(fèi)者模式。
什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
from multiprocessing import Queue,Process import time import randomdef get_func(q):while 1:time.sleep(random.randint(1, 3))info = q.get()print('\033[32m廠長(zhǎng)拿走了%s \033[0m'%info)def put_func(q):for i in range(20):info = '娃娃%s號(hào)'%iq.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))if __name__ == '__main__':q = Queue(5)p_get = Process(target=get_func, args=(q,))p_put = Process(target=put_func, args=(q,))p_get.start()p_put.start()基于隊(duì)列的生產(chǎn)者消費(fèi)者模型?上述代碼是基于隊(duì)列實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模型,生產(chǎn)者一直在生產(chǎn)娃娃,消費(fèi)者一直在從隊(duì)列中獲取娃娃,但是消費(fèi)者因?yàn)椴恢郎a(chǎn)者要生產(chǎn)多少娃娃,也不知道生產(chǎn)者何時(shí)就不生產(chǎn)了,所以消費(fèi)者需要一個(gè)死循環(huán)一直嘗試去從隊(duì)列中獲取娃娃,那么此時(shí)問(wèn)題就出現(xiàn)了,3個(gè)進(jìn)程,主進(jìn)程開(kāi)啟了兩個(gè)子進(jìn)程分別為生產(chǎn)者和消費(fèi)者,當(dāng)生產(chǎn)者生產(chǎn)完數(shù)據(jù)后,生產(chǎn)者結(jié)束,消費(fèi)者一直在嘗試接收數(shù)據(jù),那么問(wèn)題就出在了消費(fèi)者的get方法這里,當(dāng)get不到數(shù)據(jù)時(shí)候就一直阻塞,那么主進(jìn)程就一直等待,此時(shí)程序就不會(huì)結(jié)束了。
解決方法也很簡(jiǎn)單,可以嘗試讓生產(chǎn)者在生產(chǎn)完數(shù)據(jù)后,再往隊(duì)列中放一個(gè)結(jié)束生產(chǎn)的信號(hào),當(dāng)消費(fèi)者接受到信號(hào)后,自動(dòng)的break出死循環(huán)即可。
from multiprocessing import Queue,Process import time import randomdef get_func(q):while 1:time.sleep(random.randint(1, 3))info = q.get()if info == None:break# 當(dāng)獲取到結(jié)束生產(chǎn)的標(biāo)識(shí)時(shí),消費(fèi)者自動(dòng)break出死循環(huán)print('\033[32m廠長(zhǎng)拿走了%s \033[0m'%info)def put_func(q):for i in range(20):info = '娃娃%s號(hào)'%iq.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))q.put(None)# 放入一個(gè)結(jié)束生產(chǎn)的標(biāo)識(shí)if __name__ == '__main__':q = Queue(5)p_get = Process(target=get_func, args=(q,))p_put = Process(target=put_func, args=(q,))p_get.start()p_put.start()修正版-消費(fèi)者生產(chǎn)者模型?注意:上述代碼中,生產(chǎn)者放入的停止生產(chǎn)的標(biāo)識(shí),放入標(biāo)識(shí)這件事其實(shí)交給主進(jìn)程來(lái)做也可以,但是此時(shí)就需要主進(jìn)程獲取到生產(chǎn)者什么時(shí)候結(jié)束生產(chǎn)。
from multiprocessing import Queue,Process import time import randomdef get_func(q):while 1:time.sleep(random.randint(1, 3))info = q.get()if info == None:break# 當(dāng)獲取到結(jié)束生產(chǎn)的標(biāo)識(shí)時(shí),消費(fèi)者自動(dòng)break出死循環(huán)print('\033[32m廠長(zhǎng)拿走了%s \033[0m'%info)def put_func(q):for i in range(20):info = '娃娃%s號(hào)'%iq.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))if __name__ == '__main__':q = Queue(5)p_get = Process(target=get_func, args=(q,))p_put = Process(target=put_func, args=(q,))p_get.start()p_put.start()p_put.join()# 讓主進(jìn)程可以獲取到生產(chǎn)者結(jié)束生產(chǎn)q.put(None)主進(jìn)程發(fā)送結(jié)束生產(chǎn)標(biāo)識(shí)?當(dāng)小伙子們嘗試上述這個(gè)代碼時(shí),發(fā)現(xiàn)所有問(wèn)題都阻擋不了你成功的腳步了,但是!!你有沒(méi)有嘗試過(guò)多個(gè)生產(chǎn)者多個(gè)消費(fèi)者的模型?what fuck? 現(xiàn)在的解決方案是不是就很 low bee了!因?yàn)槟銜?huì)發(fā)現(xiàn):(標(biāo)題)
from multiprocessing import Queue,Process import time import randomdef get_func(q,consumer):while 1:time.sleep(random.randint(1, 3))info = q.get()if info == None:break# 當(dāng)獲取到結(jié)束生產(chǎn)的標(biāo)識(shí)時(shí),消費(fèi)者自動(dòng)break出死循環(huán)print('\033[32m%s 拿走了%s \033[0m'%(consumer,info))def put_func(q,product):for i in range(5):info = '%s %s號(hào)'%(product,i)q.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))if __name__ == '__main__':q = Queue(5)con1 = Process(target=get_func, args=(q,'Alex'))con2 = Process(target=get_func, args=(q,'WuSir'))pro1 = Process(target=put_func, args=(q,'蒼老師版'))pro2 = Process(target=put_func, args=(q,'小澤瑪雅麗版'))pro3 = Process(target=put_func, args=(q,'島國(guó)米飯保你愛(ài)版'))pro1.start()pro2.start()pro3.start()con1.start()con2.start()pro1.join()# 讓主進(jìn)程可以獲取到生產(chǎn)者結(jié)束生產(chǎn)pro2.join()# 主進(jìn)程必須等待所有生產(chǎn)者生產(chǎn)完畢后才可以放結(jié)束生產(chǎn)的信號(hào)pro3.join()q.put(None)# 有幾個(gè)消費(fèi)者就應(yīng)該給幾個(gè)信號(hào)q.put(None)多個(gè)消費(fèi)者:有幾個(gè)消費(fèi)者就應(yīng)該放幾個(gè)結(jié)束生產(chǎn)標(biāo)識(shí)JoinableQueue([maxsize])?
?創(chuàng)建可連接的共享隊(duì)列進(jìn)程。它就好像一個(gè)Queue對(duì)象,但是它自帶光環(huán),允許消費(fèi)者通知生產(chǎn)者是不是已經(jīng)消費(fèi)完所有的數(shù)據(jù)了。通知進(jìn)程是使用共享的信號(hào)和條件變量來(lái)實(shí)現(xiàn)的。?
老樣子,先來(lái)接收一下JoinableQueue給咱們提供的方法:
JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外,還具有以下方法:q.task_done() 消費(fèi)者使用此方法發(fā)出信號(hào),表示隊(duì)列中放入的數(shù)據(jù)已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中獲取的數(shù)據(jù)數(shù)量,將引發(fā)ValueError異常。q.join() 生產(chǎn)者將使用此方法進(jìn)行阻塞,直到隊(duì)列中所有項(xiàng)目均被處理。阻塞將持續(xù)到消費(fèi)者為隊(duì)列中的每個(gè)數(shù)據(jù)均調(diào)用q.task_done()方法為止。 方法介紹下面舉個(gè)栗子,建立一個(gè)永遠(yuǎn)運(yùn)行的進(jìn)程,來(lái)無(wú)限制的消費(fèi)生產(chǎn)者的數(shù)據(jù),生產(chǎn)者只需要將數(shù)據(jù)放入隊(duì)列并等待被處理即可
from multiprocessing import JoinableQueue,Process import time import randomdef get_func(q,consumer):while 1:time.sleep(random.randint(1, 3))info = q.get()print('\033[32m%s 拿走了%s \033[0m'%(consumer,info))q.task_done()# 消費(fèi)者每消費(fèi)一個(gè)數(shù)據(jù),就要返回一次標(biāo)識(shí)給q.join,證明數(shù)據(jù)已經(jīng)被消費(fèi)def put_func(q,product):for i in range(5):info = '%s %s號(hào)'%(product,i)q.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))q.join()# 生產(chǎn)完畢,使用此方法進(jìn)行阻塞,直到隊(duì)列中所有數(shù)據(jù)均被處理。if __name__ == '__main__':q = JoinableQueue(5)con1 = Process(target=get_func, args=(q,'Alex'))con2 = Process(target=get_func, args=(q,'WuSir'))pro1 = Process(target=put_func, args=(q,'蒼老師版'))pro2 = Process(target=put_func, args=(q,'小澤瑪雅麗版'))pro3 = Process(target=put_func, args=(q,'島國(guó)米飯保你愛(ài)版'))l_p = [pro1,pro2,pro3,con1,con2]for i in l_p:i.start()pro1.join()pro2.join()pro3.join() # 此代碼的大體邏輯:主進(jìn)程開(kāi)啟3個(gè)生產(chǎn)者和2個(gè)消費(fèi)者,合計(jì)6個(gè)進(jìn)程在并發(fā)執(zhí)行。 # 主進(jìn)程等待 pro1 pro2 pro3的執(zhí)行完畢 # pro1 pro2 pro3 進(jìn)程中都有q.join(),會(huì)等待消費(fèi)者con1 con2消費(fèi)完所以數(shù)據(jù) # 即 :主進(jìn)程在等pro1 pro2 pro3 ,而pro1 pro2 pro3 在等 con1 con2 # 此代碼中消費(fèi)者進(jìn)程con1 con2 會(huì)一直阻塞在q.get等待接收數(shù)據(jù),而造成程序不會(huì)結(jié)束 # 如果程序想要結(jié)束.......小伙子,試試把con1 con2設(shè)置成守護(hù)進(jìn)程試試JoinableQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型管道(了解)?
#創(chuàng)建管道的類(lèi): Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道 #參數(shù)介紹: dumplex:默認(rèn)管道是全雙工的,即conn1和conn2都是既能收數(shù)據(jù)也能發(fā)數(shù)據(jù)的,如果將duplex設(shè)成False,conn1只能用于接收,conn2只能用于發(fā)送。 #主要方法:conn1.recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒(méi)有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。conn1.send(obj):通過(guò)連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象#其他方法: conn1.close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法 conn1.fileno():返回連接使用的整數(shù)文件描述符 conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長(zhǎng)時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout設(shè)成None,操作將無(wú)限期地等待數(shù)據(jù)到達(dá)。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過(guò)了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無(wú)法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過(guò)連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫(xiě)入的緩沖區(qū)接口(即bytearray對(duì)象或類(lèi)似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。介紹管道的介紹 from multiprocessing import Process, Pipe import timedef son(conn2):time.sleep(1)print(conn2.recv())conn2.send('我是你爺爺')if __name__ == '__main__':conn1, conn2 = Pipe()p = Process(target=son, args=(conn2,))p.start()conn1.send('我是你爸爸!') # conn1既能發(fā)數(shù)據(jù)time.sleep(1)print(conn1.recv()) # conn1 也能接數(shù)據(jù)p.join()管道的初使用?應(yīng)該特別注意管道端點(diǎn)的正確管理問(wèn)題。如果是生產(chǎn)者或消費(fèi)者中都沒(méi)有使用管道的某個(gè)端點(diǎn),就應(yīng)將它關(guān)閉。這也說(shuō)明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端,在消費(fèi)者中關(guān)閉管道的輸入端。如果忘記執(zhí)行這些步驟,程序可能在消費(fèi)者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計(jì)數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生成EOFError異常。因此,在生產(chǎn)者中關(guān)閉管道不會(huì)有任何效果,除非消費(fèi)者也關(guān)閉了相同的管道端點(diǎn)。
from multiprocessing import Process, Pipedef son(conn2,conn1):# conn1.close() # 不寫(xiě)就不會(huì)發(fā)生EOFError異常while 1:try:print(conn2.recv())conn2.send('我是你爺爺')except EOFError:# print(123) # 驗(yàn)證確實(shí)發(fā)生了EOFError異常conn2.close()breakif __name__ == '__main__':conn1, conn2 = Pipe()p = Process(target=son, args=(conn2,conn1))p.start()conn2.close()conn1.send('我是你爸爸!') # conn1既能發(fā)數(shù)據(jù)print(conn1.recv()) # conn1 也能接數(shù)據(jù)conn1.close()自己整個(gè)異常玩玩 from multiprocessing import Pipe, Process import time import randomdef consumer(conn, man):conn1, conn2 = connconn1.close()while 1:try:info = conn2.recv()time.sleep(random.random())print('\033[31m%s 取走了%s\033[0m' % (man, info))except EOFError:print('\033[32m沒(méi)有娃娃了,等下一期吧\033[0m')conn2.close()break # 借助一定產(chǎn)生的EOFError異常來(lái)讓程序結(jié)束def producer(conn, pro):conn1, conn2 = connconn2.close()for i in range(10):info = '%s 的娃娃%s號(hào)' % (pro, i)conn1.send(info)conn1.close()if __name__ == '__main__':conn1, conn2 = Pipe()cons1 = Process(target=consumer, args=((conn1, conn2), 'alex'))prod1 = Process(target=producer, args=((conn1, conn2), '蒼老師版'))cons1.start()prod1.start()conn1.close() # 不寫(xiě)的話,消費(fèi)者進(jìn)程結(jié)束不了。 一定要記住,多進(jìn)程中內(nèi)核會(huì)對(duì)每個(gè)進(jìn)程的管道進(jìn)行計(jì)數(shù),必須在所有進(jìn)程中都關(guān)閉管道才會(huì)引發(fā)EOFError異常conn2.close()pipe實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模型 from multiprocessing import Pipe, Process, Lockdef consumer(conn, man,color,l):conn1, conn2 = connconn1.close()while 1:l.acquire()info = conn2.recv()l.release()if info:print('\033[%s %s 取走了%s\033[0m' % (color,man, info))else:conn2.close()breakdef producer(conn, pro):conn1, conn2 = connconn2.close()for i in range(20):info = '%s 的娃娃%s號(hào)' % (pro, i)conn1.send(info)conn1.send(None)conn1.send(None)conn1.send(None)conn1.close()if __name__ == '__main__':conn1, conn2 = Pipe()l = Lock()cons1 = Process(target=consumer, args=((conn1, conn2), 'alex','31m',l))cons2 = Process(target=consumer, args=((conn1, conn2), 'Wusir','33m',l))cons3 = Process(target=consumer, args=((conn1, conn2), '太白','34m',l))cons4 = Process(target=consumer, args=((conn1, conn2), '彥濤','35m',l))cons5 = Process(target=consumer, args=((conn1, conn2), 'AAA','36m',l))cons6 = Process(target=consumer, args=((conn1, conn2), 'BBB','37m',l))prod1 = Process(target=producer, args=((conn1, conn2), '蒼老師版'))prod2 = Process(target=producer, args=((conn1, conn2), '韓紅版'))l_p = [cons1,cons2,cons3,cons4,cons5,cons6,prod1,prod2,][i.start() for i in l_p]conn1.close()conn2.close()[i.join() for i in l_p]多個(gè)消費(fèi)者競(jìng)爭(zhēng)數(shù)據(jù)帶來(lái)了數(shù)據(jù)不安全的問(wèn)題進(jìn)程直接的數(shù)據(jù)共享 ? ? ? ? ? ?
展望未來(lái),基于消息傳遞的并發(fā)編程是大勢(shì)所趨
即便是使用線程,推薦做法也是將程序設(shè)計(jì)為大量獨(dú)立的線程集合,通過(guò)消息隊(duì)列交換數(shù)據(jù)。
這樣極大地減少了對(duì)使用鎖定和其他同步手段的需求,還可以擴(kuò)展到分布式系統(tǒng)中。
但進(jìn)程間應(yīng)該盡量避免通信,即便需要通信,也應(yīng)該選擇進(jìn)程安全的工具來(lái)避免加鎖帶來(lái)的問(wèn)題。
以后我們會(huì)嘗試使用數(shù)據(jù)庫(kù)來(lái)解決現(xiàn)在進(jìn)程之間的數(shù)據(jù)共享問(wèn)題。
進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊(duì)列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的 雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過(guò)Manager實(shí)現(xiàn)數(shù)據(jù)共享,事實(shí)上Manager的功能遠(yuǎn)不止于此A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.Manager模塊官方說(shuō)法 from multiprocessing import Manager,Process,Lockdef func(dic,lock):# lock.acquire() # 使用manager模塊,多進(jìn)程共享數(shù)據(jù)時(shí),如果不加鎖,必然會(huì)造成數(shù)據(jù)混亂dic[0] -= 1# lock.release()if __name__ == '__main__':m = Manager()lock = Lock()s = m.list([50])print(s)l = []for i in range(50):p = Process(target=func,args=(s,lock))p.start()l.append(p)[p.join() for p in l]print(s[0])Manager模塊的簡(jiǎn)單用法進(jìn)程池和multiprocessing.Pool 模塊 ? ? ?
進(jìn)程池 ? ? ? ?
為什么要有進(jìn)程池?進(jìn)程池的概念。
在程序?qū)嶋H處理問(wèn)題過(guò)程中,忙時(shí)會(huì)有成千上萬(wàn)的任務(wù)需要被執(zhí)行,閑時(shí)可能只有零星任務(wù)。那么在成千上萬(wàn)個(gè)任務(wù)需要被執(zhí)行的時(shí)候,我們就需要去創(chuàng)建成千上萬(wàn)個(gè)進(jìn)程么?首先,創(chuàng)建進(jìn)程需要消耗時(shí)間,銷(xiāo)毀進(jìn)程也需要消耗時(shí)間。第二即便開(kāi)啟了成千上萬(wàn)的進(jìn)程,操作系統(tǒng)也不能讓他們同時(shí)執(zhí)行,這樣反而會(huì)影響程序的效率。因此我們不能無(wú)限制的根據(jù)任務(wù)開(kāi)啟或者結(jié)束進(jìn)程。那么我們要怎么做呢?
在這里,要給大家介紹一個(gè)進(jìn)程池的概念,定義一個(gè)池子,在里面放上固定數(shù)量的進(jìn)程,有需求來(lái)了,就拿這個(gè)池中的進(jìn)程來(lái)處理任務(wù),等到處理完畢,進(jìn)程并不結(jié)束,而是將進(jìn)程再放回進(jìn)程池中繼續(xù)等待任務(wù)。如果有很多任務(wù)需要執(zhí)行,池中的進(jìn)程數(shù)量不夠,任務(wù)就要等待之前的進(jìn)程執(zhí)行任務(wù)完畢歸來(lái),拿到空閑進(jìn)程才能繼續(xù)執(zhí)行。也就是說(shuō),池中進(jìn)程的數(shù)量是固定的,那么同一時(shí)間最多有固定數(shù)量的進(jìn)程在運(yùn)行。這樣不會(huì)增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開(kāi)閉進(jìn)程的時(shí)間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果。
舉個(gè)栗子:在小黑屋子里(池)有4個(gè)小姐姐(進(jìn)程)一直在等待服務(wù)客人,當(dāng)有1個(gè)客人來(lái),就派一個(gè)小姐姐去服務(wù),當(dāng)有很多人來(lái),就得等哪個(gè)小姐姐服務(wù)完上個(gè)人之后再繼續(xù)去服務(wù)下一個(gè)人。
multiprocessing.Pool 模塊
p = Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池1 numprocess:要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值 2 initializer:是每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None 3 initargs:是要傳給initializer的參數(shù)組 p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 '''需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過(guò)不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]],callback=None):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 '''此方法的結(jié)果是AsyncResult類(lèi)的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將結(jié)果傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。'''p.map( func, iterable):將iterable序列中每一個(gè)元素當(dāng)成參數(shù)傳遞給func,異步執(zhí)行func函數(shù)。p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用主要方法Pool主要方法 方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法:1、obj.get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒(méi)有到達(dá),將引發(fā)一場(chǎng)。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。 2、obj.ready():如果調(diào)用完成,返回True 3、obj.successful():如果調(diào)用完成且沒(méi)有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常 4、obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?5、obj.terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)其他方法(了解即可)代碼實(shí)例 ? ? ? ?
進(jìn)程池和多進(jìn)程效率對(duì)比
from multiprocessing import Pool,Process import time def func(i):i += 1print(i)if __name__ == '__main__':p = Pool(4)# 創(chuàng)建進(jìn)程池,池中有4個(gè)進(jìn)程待命start = time.time()t = [i for i in range(100)]p.map(func,t)#p.close()p.join()# 等待進(jìn)程池中所有進(jìn)程都執(zhí)行完所有任務(wù)。print(time.time() - start)# 打印進(jìn)程池做任務(wù)的時(shí)間start = time.time()l = []for i in range(100):p = Process(target=func,args=(i,))p.start()l.append(p)[i.join() for i in l]# 等待所有進(jìn)程工作結(jié)束print(time.time() - start)# 打印開(kāi)啟100個(gè)進(jìn)程做任務(wù)的時(shí)間print('--'*20+'>')map進(jìn)程池和多進(jìn)程做任務(wù)的效率對(duì)比?進(jìn)程池的同步調(diào)用和異步調(diào)用
from multiprocessing import Pool,Process import time,osdef func(i):i+= 1time.sleep(2)print(i,os.getpid())return i if __name__ == '__main__':p = Pool(5)# 創(chuàng)建進(jìn)程池,池中有5個(gè)進(jìn)程待命res_l = []for i in range(10):res = p.apply(func,args=(i,))# 有10個(gè)任務(wù)來(lái),交給進(jìn)程池中5個(gè)進(jìn)程,5個(gè)進(jìn)程同步執(zhí)行任務(wù)直到拿到resres_l.append(res)print(res_l)進(jìn)程池的同步調(diào)用 from multiprocessing import Pool,Process import time,osdef func(i):i+= 1time.sleep(2)print(i,os.getpid())return i if __name__ == '__main__':p = Pool(5)# 創(chuàng)建進(jìn)程池,池中有5個(gè)進(jìn)程待命res_l = []for i in range(10):res = p.apply_async(func,args=(i,))# 異步執(zhí)行10個(gè)任務(wù),每次最多5個(gè)進(jìn)程同時(shí)運(yùn)行res_l.append(res)# 拿到結(jié)果后是一個(gè)AsyncResul的實(shí)例obj,先將結(jié)果放入列表p.close()p.join()# 異步執(zhí)行需要join,也就是讓主進(jìn)程等待進(jìn)程池中所有進(jìn)程執(zhí)行完所有任務(wù),否則可能進(jìn)程池中的進(jìn)程還沒(méi)來(lái)得及執(zhí)行任務(wù),主進(jìn)程就結(jié)束了。for i in res_l:print(i.get())# 異步機(jī)制,從AsyncResul的實(shí)例obj中g(shù)et到實(shí)際結(jié)果,同步機(jī)制沒(méi)有此方法# 因?yàn)橥綑C(jī)制能直接拿到實(shí)際結(jié)果# 其實(shí)get是阻塞等待的,也就是說(shuō),如果沒(méi)有上邊的close和join :# 主進(jìn)程一樣會(huì)阻塞在get等待進(jìn)程池中給返回結(jié)果,進(jìn)程池異步執(zhí)行任務(wù)獲取結(jié)果# 每次有一個(gè)進(jìn)程返回結(jié)果后,就能get到一個(gè)結(jié)果,然后for循環(huán)到下一次繼續(xù)阻塞等待拿結(jié)果進(jìn)程池的異步調(diào)用?練習(xí):進(jìn)程池實(shí)現(xiàn)socket并發(fā)聊天
from multiprocessing import Pool import socketSOURCE_ADDR = ('127.0.0.1', 8090)def communication(conn):while 1:try:msg_r = conn.recv(1024).decode('utf-8')if not msg_r: breakprint(msg_r)conn.send(msg_r.upper().encode('utf-8'))except Exception: # 可能接收比如客戶端直接強(qiáng)制斷開(kāi)連接等錯(cuò)誤。print('結(jié)束')breakif __name__ == '__main__':p = Pool(4)sk = socket.socket()sk.bind(SOURCE_ADDR)sk.listen()while 1:conn, addr = sk.accept()p.apply_async(communication, args=(conn,))server端 import socketsk = socket.socket() SOURCE_ADDR = ('127.0.0.1',8090) sk.connect(SOURCE_ADDR)while 1:msg_s = input('>>>')if not msg_s:continuesk.send(msg_s.encode('utf-8'))print(sk.recv(1024).decode('utf-8'))client端?上述代碼體現(xiàn)出:并發(fā)開(kāi)啟多個(gè)客戶端,服務(wù)端同一時(shí)間只能接收4個(gè)客戶端的請(qǐng)求,當(dāng)再有客戶端請(qǐng)求時(shí),需要等待,只能結(jié)束一個(gè)客戶端,另外一個(gè)客戶端才會(huì)進(jìn)來(lái).
需要回調(diào)函數(shù)的場(chǎng)景:進(jìn)程池中任何一個(gè)任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個(gè)函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)我們可以把耗時(shí)間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時(shí)就省去了I/O的過(guò)程,直接拿到的是任務(wù)的結(jié)果需要注意的是:在進(jìn)程池中,回調(diào)函數(shù)是由主函數(shù)調(diào)用的! import requests from multiprocessing import Pool,Process import time import os def get_url(url):while 1:r = requests.get(url)if r.status_code == 200:return url,r.textdef write_res(result):info = '%s : %s'%(result[0],result[1])with open('info.txt','a',encoding='utf-8') as f:f.write(info)# print('回調(diào)函數(shù)pid:%s'%(os.getpid()))if __name__ == '__main__':p = Pool(4)# print('主函數(shù)pid:%s'%(os.getpid()))urls = ['https://www.baidu.com','http://www.python.org','http://www.jd.com','http://www.taobao.com','http://www.mi.com','http://www.cnblogs.com']res_l = []for url in urls:res = p.apply_async(get_url,args=(url,),callback=write_res)res_l.append(res)p.close()p.join()for i in res_l:print(i.get()[0])使用進(jìn)程池請(qǐng)求多個(gè)url以減少網(wǎng)絡(luò)延遲等待 import requests import bs4 from multiprocessing import Poolheader = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0' }# 請(qǐng)求頭def get_img_url(url):'''獲取目標(biāo)網(wǎng)頁(yè)中,每個(gè)目的圖片的準(zhǔn)確地址'''img_url = []res = requests.get(url, headers=header)if res.status_code == 200:soup = bs4.BeautifulSoup(res.text, 'html.parser')img_addr = soup.find('div', class_='beauty_details_imgs_box').find_all('li')for i in img_addr:img = i.find('img').get('src')img_url.append(img)return img_urldef get_img(img_url):'''進(jìn)程池異步請(qǐng)求 每個(gè)目的圖片的準(zhǔn)確地址'''res = requests.get(img_url)if res.status_code == 200:return res, img_urldef save_img(img_url):'''回調(diào)函數(shù),每有一個(gè)進(jìn)程獲取到圖片地址后,將圖片寫(xiě)入本地'''res, img = img_url[0], img_url[1]with open(img.split('/')[-1].split('_')[0], 'wb') as f:f.write(res.content)if __name__ == '__main__':url = 'http://www.xiao4j.com/beauty/photos/30687.html'p = Pool(4)img_addr = get_img_url(url)for img in img_addr:p.apply_async(get_img, args=(img,), callback=save_img)p.close()p.join()爬妹子 爬妹子
?在進(jìn)程池中,如果對(duì)于每個(gè)進(jìn)程返回的結(jié)果并不需要及時(shí)處理,需要等待所有結(jié)果完成后再統(tǒng)一處理的話,就無(wú)需回調(diào)函數(shù)了。
def func(num):sum = 0for i in range(num):sum += ireturn sumif __name__ == '__main__':p = Pool(4)res = []for i in range(20):r = p.apply_async(func,args=(i,))res.append(r)p.close()p.join()[print(i.get()) for i in res]不需要用回調(diào)函數(shù)進(jìn)程池的其他實(shí)現(xiàn)方式:https://docs.python.org/dev/library/concurrent.futures.html
?
參考資料:
http://www.cnblogs.com/linhaifeng/articles/6817679.html
http://www.cnblogs.com/Eva-J/articles/8253549.html
https://www.jianshu.com/p/1200fd49b583
?
轉(zhuǎn)載于:https://www.cnblogs.com/zhaoyang110/p/9508273.html
總結(jié)
以上是生活随笔為你收集整理的python之路-进程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 隐藏此电脑中多出来的 3D 对象、视频、
- 下一篇: 分享一个免费的听书、说书(文字转语音)、