python -- 进程
什么是進程
要了解進程先了解操作系統,可以參考一下我的上一篇博客:操作系統
進程(Process)是計算機中的程序關于某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。
狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。 廣義定義:進程是一個具有一定獨立功能的程序關于某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。 ''' 進程的概念 第一,進程是一個實體。每一個進程都有它自己的地址空間,一般情況下,包括文本區域(text region)、數據區域(data region)和堆棧(stack region)。文本區域存儲處理器執行的代碼;數據區域存儲變量和進程執行期間使用的動態分配的內存;堆棧區域存儲著活動過程調用的指令和本地變量。 第二,進程是一個“執行中的程序”。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操作系統執行之),它才能成為一個活動的實體,我們稱其為進程。[3] 進程是操作系統中最基本、重要的概念。是多道程序系統出現后,為了刻畫系統內部出現的動態情況,描述系統內部各道程序的活動規律引進的一個概念,所有多道程序設計操作系統都建立在進程的基礎上。操作系統引入進程的概念的原因 從理論角度看,是對正在運行的程序過程的抽象; 從實現角度看,是一種數據結構,目的在于清晰地刻畫動態系統的內在規律,有效管理和調度進入計算機系統主存儲器運行的程序。進程的特征 動態性:進程的實質是程序在多道程序系統中的一次執行過程,進程是動態產生,動態消亡的。 并發性:任何進程都可以同其他進程一起并發執行 獨立性:進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位; 異步性:由于進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度向前推進 結構特征:進程由程序、數據和進程控制塊三部分組成。 多個不同的進程可以包含相同的程序:一個程序在不同的數據集里就構成不同的進程,能得到不同的結果;但是執行過程中,程序不能發生改變。進程與程序中的區別 程序是指令和數據的有序集合,其本身沒有任何運行的含義,是一個靜態的概念。 而進程是程序在處理機上的一次執行過程,它是一個動態的概念。 程序可以作為一種軟件資料長期存在,而進程是有一定生命期的。 程序是永久的,進程是暫時的。注意:同一個程序執行兩次,就會在操作系統中出現兩個進程,所以我們可以同時運行一個軟件,分別做不同的事情也不會混亂。 ''' 進程進程調度
要想多個進程交替運行,操作系統必須對這些進程進行調度,這個調度也不是隨即進行的,而是需要遵循一定的法則,由此就有了進程的調度算法。
''' 先來先服務調度算法 先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可用于作業調度,也可用于進程調度。FCFS算法比較有利于長作業(進程),而不利于短作業(進程)。由此可知,本算法適合于CPU繁忙型作業,而不利于I/O繁忙型的作業(進程)。 '''''' 短作業優先調度算法 短作業(進程)優先調度算法(SJ/PF)是指對短作業或短進程優先調度的算法,該算法既可用于作業調度,也可用于進程調度。但其對長作業不利;不能保證緊迫性作業(進程)被及時處理;作業的長短只是被估算出來的。 '''''' 時間片輪轉法時間片輪轉(Round Robin,RR)法的基本思路是讓每個進程在就緒隊列中的等待時間與享受服務的時間成比例。在時間片輪轉法中,需要將CPU的處理時間分成固定大小的時間片,例如,幾十毫秒至幾百毫秒。如果一個進程在被調度選中之后用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放自己所占有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。顯然,輪轉法只能用來調度分配一些可以搶占的資源。這些可以搶占的資源可以隨時被剝奪,而且可以將它們再分配給別的進程。CPU是可搶占資源的一種。但打印機等資源是不可搶占的。由于作業調度是對除了CPU之外的所有系統硬件資源的分配,其中包含有不可搶占資源,所以作業調度不使用輪轉法。 在輪轉法中,時間片長度的選取非常重要。首先,時間片長度的選擇會直接影響到系統的開銷和響應時間。如果時間片長度過短,則調度程序搶占處理機的次數增多。這將使進程上下文切換次數也大大增加,從而加重系統開銷。反過來,如果時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執行時間最長的進程能執行完畢,則輪轉法變成了先來先服務法。時間片長度的選擇是根據系統對響應時間的要求和就緒隊列中所允許最大的進程數來確定的。在輪轉法中,加入到就緒隊列的進程有3種情況:一種是分給它的時間片用完,但進程還未完成,回到就緒隊列的末尾等待下次調度去繼續執行。另一種情況是分給該進程的時間片并未用完,只是因為請求I/O或由于進程的互斥與同步關系而被阻塞。當阻塞解除之后再回到就緒隊列。第三種情況就是新創建進程進入就緒隊列。如果對這些進程區別對待,給予不同的優先級和時間片從直觀上看,可以進一步改善系統服務質量和效率。例如,我們可把就緒隊列按照進程到達就緒隊列的類型和進程被阻塞時的阻塞原因分成不同的就緒隊列,每個隊列按FCFS原則排列,各隊列之間的進程享有不同的優先級,但同一隊列內優先級相同。這樣,當一個進程在執行完它的時間片之后,或從睡眠中被喚醒以及被創建之后,將進入不同的就緒隊列。 '''''' 多級反饋隊列前面介紹的各種用作進程調度的算法都有一定的局限性。如短進程優先的調度算法,僅照顧了短進程而忽略了長進程,而且如果并未指明進程的長度,則短進程優先和基于進程長度的搶占式調度算法都將無法使用。 而多級反饋隊列調度算法則不必事先知道各種進程所需的執行時間,而且還可以滿足各種類型進程的需要,因而它是目前被公認的一種較好的進程調度算法。在采用多級反饋隊列調度算法的系統中,調度算法的實施過程如下所述。 (1) 應設置多個就緒隊列,并為各個隊列賦予不同的優先級。第一個隊列的優先級最高,第二個隊列次之,其余各隊列的優先權逐個降低。該算法賦予各個隊列中進程執行時間片的大小也各不相同,在優先權愈高的隊列中,為每個進程所規定的執行時間片就愈小。例如,第二個隊列的時間片要比第一個隊列的時間片長一倍,……,第i+1個隊列的時間片要比第i個隊列的時間片長一倍。 (2) 當一個新進程進入內存后,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調度。當輪到該進程執行時,如它能在該時間片內完成,便可準備撤離系統;如果它在一個時間片結束時尚未完成,調度程序便將該進程轉入第二隊列的末尾,再同樣地按FCFS原則等待調度執行;如果它在第二隊列中運行一個時間片后仍未完成,再依次將它放入第三隊列,……,如此下去,當一個長作業(進程)從第一隊列依次降到第n隊列后,在第n 隊列便采取按時間片輪轉的方式運行。(3) 僅當第一隊列空閑時,調度程序才調度第二隊列中的進程運行;僅當第1~(i-1)隊列均空時,才會調度第i隊列中的進程運行。如果處理機正在第i隊列中為某進程服務時,又有新進程進入優先權較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進程將搶占正在運行進程的處理機,即由調度程序把正在運行的進程放回到第i隊列的末尾,把處理機分配給新到的高優先權進程。 ''' 進程調度算法進程的并行與并發
并行?:?并行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個線程,四核的CPU )
并發?:?并發是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人,A走一段后,讓給B,B用完繼續給A ,交替使用,目的是提高效率。
區別:
并行是從微觀上,也就是在一個精確的時間片刻,有不同的程序在執行,這就要求必須有多個處理器。
并發是從宏觀上,在一個時間段上可以看出是同時執行的,比如一個服務器同時處理多個session。
同步異步阻塞非阻塞
在了解其他概念之前,我們首先要了解進程的幾個狀態。在程序運行的過程中,由于被操作系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。
(1)就緒(Ready)狀態
當進程已分配到除CPU以外的所有必要的資源,只要獲得處理機便可立即執行,這時的進程狀態稱為就緒狀態。
(2)執行/運行(Running)狀態當進程已獲得處理機,其程序正在處理機上執行,此時的進程狀態稱為執行狀態。
(3)阻塞(Blocked)狀態正在執行的進程,由于等待某個事件發生而無法執行時,便放棄處理機而處于阻塞狀態。引起進程阻塞的事件可有多種,例如,等待I/O完成、申請緩沖區不能滿足、等待信件(信號)等。
#長須在開始運行之后,并不是立即開始執行代碼會進入就緒狀態,等待操作系統調度運行 import time #程序運行狀態 print('start')name = input('>>>') #程序進入阻塞狀態print(name) #運行 time.sleep(1) #繼續阻塞print('finish') #運行 #結束操作同步和異步
?? ??所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列。要么成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。
所謂異步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了。至于被依賴的任務最終是否真正完成,依賴它的任務無法確定,所以它是不可靠的任務序列。
阻塞與非阻塞
? ? ??阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來說的
同步/異步與阻塞/非阻塞
效率最低。拿上面的例子來說,就是你專心排隊,什么別的事都不做。
如果在銀行等待辦理業務的人采用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間里他不能離開銀行做其它的事情,那么很顯然,這個人被阻塞在了這個等待的操作上面;
異步操作是可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。
實際上是效率低下的。
想象一下你一邊打著電話一邊還需要抬頭看到底隊伍排到你了沒有,如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的。
效率更高,
因為打電話是你(等待者)的事情,而通知你則是柜臺(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換。
比如說,這個人突然發覺自己煙癮犯了,需要出去抽根煙,于是他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那么他就沒有被阻塞在這個等待的操作上面,自然這個就是異步+非阻塞的方式了。
很多人會把同步和阻塞混淆,是因為很多時候同步操作會以阻塞的形式表現出來,同樣的,很多人也會把異步和非阻塞混淆,因為異步操作一般都不會在真正的IO操作處被阻塞。
進程的創建與結束
進程的創建
但凡是硬件,都需要有操作系統去管理,只要有操作系統,就有進程的概念,就需要有創建進程的方式,一些操作系統只為一個應用程序設計,比如微波爐中的控制器,一旦啟動微波爐,所有的進程都已經存在。
而對于通用系統(跑很多應用程序),需要有系統運行過程中創建或撤銷進程的能力,主要分為4中形式創建新的進程:
1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,后臺運行的進程與用戶無關,運行在后臺并且只在需要時才喚醒的進程,稱為守護進程,如電子郵件、web頁面、新聞、打印)
2. 一個進程在運行過程中開啟了子進程(如nginx開啟多進程,os.fork,subprocess.Popen等)
3. 用戶的交互式請求,而創建一個新進程(如用戶雙擊暴風影音)
4. 一個批處理作業的初始化(只在大型機的批處理系統中應用)
無論哪一種,新進程的創建都是由一個已經存在的進程執行了一個用于創建進程的系統調用而創建的。
進程的結束
1. 正常退出(自愿,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自愿,python a.py中a.py不存在)
3. 嚴重錯誤(非自愿,執行非法指令,如引用不存在的內存,1/0等,可以捕捉異常,try...except...)
4. 被其他進程殺死(非自愿,如kill -9)
在python程序中的進程操作
multiprocess模塊
? 仔細說來,multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由于提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分,進程同步部分,進程池部分,進程之間數據共享。
multiprocess.process模塊
process模塊介紹
process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號參數介紹: 1 group參數未使用,值始終為None 2 target表示調用對象,即子進程要執行的任務 3 args表示調用對象的位置參數元組,args=(1,2,'egon',) 4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} 5 name為子進程的名稱方法介紹 1 p.start():啟動進程,并調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖 4 p.is_alive():如果p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程 屬性介紹 1 p.daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程終止時,p也隨之終止,并且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)在windows中使用process模塊的注意事項在Windows操作系統中由于沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。 process介紹使用process模塊創建進程
''' 查看進程號getpid() and getppid() ''' import os from multiprocessing import Processdef f(x):print('子進程id :',os.getpid(),'父進程id :',os.getppid())return x*xif __name__ == '__main__':print('主進程id :', os.getpid())p_lst = []for i in range(5):p = Process(target=f, args=(i,))p.start() 查看進程號多個進程同時運行
import time from multiprocessing import Processdef f(name):print('hello', name)time.sleep(1)if __name__ == '__main__':p_lst = []for i in range(5):p = Process(target=f, args=('bob',))p.start()p_lst.append(p) 多個進程同時運行 import time from multiprocessing import Processdef f(name):print('hello', name)time.sleep(1)if __name__ == '__main__':p_lst = []for i in range(5):p = Process(target=f, args=('bob',))p.start()p_lst.append(p)p.join()# [p.join() for p in p_lst]print('父進程在執行') 多個進程同時運行,join方法 import os from multiprocessing import Processclass MyProcess(Process):def __init__(self,name):super().__init__()self.name=namedef run(self):print(os.getpid())print('%s 正在和女主播聊天' %self.name)p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha')p1.start() #start會自動調用run p2.start() # p2.run() p3.start()p1.join() p2.join() p3.join()print('主線程')通過繼承Process類開啟進程 通過繼承Process類開啟進程 from multiprocessing import Processdef work():global nn=0print('子進程內: ',n)if __name__ == '__main__':n = 100p=Process(target=work)p.start()print('主進程內: ',n) 進程之間的數據隔離問題socket聊天并發協議
基于FTP協議
import socket from multiprocessing import Processdef talk(conn):conn.send(b'connected')ret = conn.recv(1024)print(ret)if __name__ == '__main__':sk = socket.socket()sk.bind(('127.0.0.1',8080))sk.listen()while True:conn,addr = sk.accept()Process(target=talk,args=(conn,))p.start()conn.close()sk.close() FTP—server端 import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) ret = sk.recv(1024) print(ret) msg = input('>>>') sk.send(msg.encode('utf-8'))sk.close() FTP—client端基于UDP協議
from socket import * from multiprocessing import Processserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn,client_addr):while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__': #windows下start進程一定要寫到這下面while True:conn,client_addr=server.accept()p=Process(target=talk,args=(conn,client_addr))p.start() UDP—server端 from socket import *client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8')) UDP—client端守護進程
會隨著主進程的結束而結束。
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束后就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
import time from multiprocessing import Process def func():print('--'*10)time.sleep(15)print('--'*10)def cla_time():while True:time.sleep(1)print('過去了1秒')if __name__ == '__main__':p = Process(target=cal_time)p.daemon = True #一定在開啟進程之前設置 p.start()p2 = Process(target=func)p2.start()for i in range(10):time.sleep(0.1)print('*'*i)p2.join() ''' 守護進程的進程的作用會隨著主進程的代碼執行結束而結束 守護進程 要在start之前設置 守護進程 不能再開啟子進程 '''多進程中的其他方法
is_alive :是否活著 :True代表進程在,False代表進程不存在
terminate:結束一個進程 但是這個進程不會立即結束
import time from multiprocessing import Process def func():print('wahaha')time.sleep(5)print('qqxing') if __name__ == '__main__':p = Process(target=func)p.start()print(p.is_alive())time.sleep(0.1)p.terminate()print(p.is_alive())time.sleep(1)print(p.is_alive()) 進程對象的其他方法:terminate,is_alive import time from multiprocessing import Process def func():print('wahaha')time.sleep(5)print('qqxing') if __name__ == '__main__':p = Process(target=func)p.start()print(p.name,p.pid)p.name = '哇哈哈哈’print(p.name)import time from multiprocessing import Process class MyProcess(Process) def run(self):print('wahaha',self.name,self.pid)time.sleep(5)print('qqxing',self.name,self.pid) if __name__ == '__main__':p = MyProcess()p.start()print(p.pid) 進程對象的其他屬性:pid和namepid:進程id
name:進程名
進程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)
鎖 —— multiprocess.Lock
# 由并發變成了串行,犧牲了運行效率,但避免了競爭 import json import time import random from multiprocessing import Lock from multiprocessing import Processdef search(i):with open('ticket') as f:print(i,json.load(f)['count'])def get(i):with open('ticket') as f:ticket_num = json.load(f)['count']time.sleep(random.random())if ticket_num > 0:with open('ticket','w') as f:json.dump({'count':ticket_num-1},f)print('%s買到票了'%i)else:print('%s沒票了'%i) def task(i,lock):search(i) #查看票 lock.acquire()get(i) #搶票 lock.release() if __name__ == '__main__':lock = Lock()for i in range(20):p = Process(target=func,args=(i,lock))p.start() 搶票例題 #文件db的內容為:{"count":5} #注意一定要用雙引號,不然json無法識別 #并發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db'))time.sleep(random.random()) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(random.random()) #模擬寫數據的網絡延遲json.dump(dic,open('db','w'))print('\033[32m購票成功\033[0m')else:print('\033[31m購票失敗\033[0m')def task(lock):search()lock.acquire()get()lock.release()if __name__ == '__main__':lock = Lock()for i in range(100): #模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start() 優化一下搶票 #加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然可以用文件共享數據實現進程間通信,但問題是: 1.效率低(共享數據基于文件,而文件是硬盤上的數據) 2.需要自己加鎖處理#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。 隊列和管道都是將數據存放于內存中 隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, 我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。信號量 —— multiprocess.Semaphore
from multiprocessing import Process,Semaphore import time,randomdef go_wc(sem,user):sem.acquire()print('%s 占到一間ktv小屋' %user)time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同 sem.release()if __name__ == '__main__':sem=Semaphore(4)p_l=[]for i in range(13):p=Process(target=go_wc,args=(sem,'user%s' %i,))p.start()p_l.append(p)for i in p_l:i.join()print('============》')''' 互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。 假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。 實現: 信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用于訪問像服務器這樣的有限資源。 信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念 ''' 迷你唱吧事件 —— multiprocess.Event
python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。clear:將“Flag”設置為False set:將“Flag”設置為True ''' import time import random from multiprocessing import Process from multiprocessing import Event def traffic_light(e):while True:if e.is_set():time.sleep(3)print('紅燈亮')e.clear() # 綠變紅else:time.sleep(3)print('綠燈亮')e.set() # 紅變綠def car(i,e):e.wait()print('%s車通過'%i)if __name__ == '__main__':e = Event() # 立一個紅燈tra = Process(target=traffic_light,args=(e,))tra.start() # 啟動一個進程來控制紅綠燈for i in range(100):if i%6 == 0 :time.sleep(random.randint(1,3))car_pro = Process(target=car, args=(i,e))car_pro.start() 紅綠燈 from multiprocessing import Process, Event import time, randomdef car(e, n):while True:if not e.is_set(): # 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色print('\033[31m紅燈亮\033[0m,car%s等著' % n)e.wait() # 阻塞,等待is_set()的值變成True,模擬信號燈為綠色print('\033[32m車%s 看見綠燈亮了\033[0m' % n)time.sleep(random.randint(3, 6))if not e.is_set(): #如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始continueprint('車開遠了,car', n)breakdef police_car(e, n):while True:if not e.is_set():# 進程剛開啟,is_set()的值是Flase,模擬信號燈為紅色print('\033[31m紅燈亮\033[0m,car%s等著' % n)e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s之后沒有等到綠燈就闖紅燈走了if not e.is_set():print('\033[33m紅燈,警車先走\033[0m,car %s' % n)else:print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)breakdef traffic_lights(e, inverval):while True:time.sleep(inverval)if e.is_set():print('######', e.is_set())e.clear() # ---->將is_set()的值設置為Falseelse:e.set() # ---->將is_set()的值設置為Trueprint('***********',e.is_set())if __name__ == '__main__':e = Event()for i in range(10):p=Process(target=car,args=(e,i,)) # 創建是個進程控制10輛車 p.start()for i in range(5):p = Process(target=police_car, args=(e, i,)) # 創建5個進程控制5輛警車 p.start()t = Process(target=traffic_lights, args=(e, 10)) # 創建一個進程控制紅綠燈 t.start()print('============》') ...進程間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)
隊列?
Queue([maxsize]) 創建共享的進程隊列。 參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現。方法介紹 Queue([maxsize]) 創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具有以下方法:q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。q.get_nowait( ) 同q.get(False)方法。q.put(item [, block [,timeout ] ] ) 將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。q.qsize() 返回隊列中目前項目的正確數量。此函數的結果并不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。其他方法 q.empty() 如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。q.full() 如果q已滿,返回為True. 由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,后臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。q.cancel_join_thread() 不會再進程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞。q.join_thread() 連接隊列的后臺線程。此方法用于在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。 ''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基于消息傳遞實現的,但是隊列接口 '''from multiprocessing import Queue q=Queue(3)#put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。# 如果隊列中的數據一直不被取走,程序就會永遠停在這里。 try:q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。print('隊列已經滿了')# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。 print(q.full()) #滿了print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。 try:q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。print('隊列已經空了')print(q.empty()) #空了生產者消費者模型
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產者們:即廚師們p1=Process(target=producer,args=(q,))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('主') 基于隊列實現生產者消費者模型此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。
from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))q.put(None) #發送結束信號 if __name__ == '__main__':q=Queue()#生產者們:即廚師們p1=Process(target=producer,args=(q,))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('主') ''' 首先 對于內存空間來說 每次只有很少的數據會在內存中 對于生產與消費黃子健的不平衡來說增加消費者或者增加生產者來調節效率 ''' 改良版——生產者消費者模型?
轉載于:https://www.cnblogs.com/soleZ/p/8406789.html
總結
以上是生活随笔為你收集整理的python -- 进程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 水一篇博客备份脚本
- 下一篇: 关于Django部分