python多进程编程 多个函数并发执行_python并发编程之多进程编程
一、multiprocessing模塊介紹
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,并在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限于該進程內。
二、Process類的介紹
1、創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]])
由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)
強調:
1.需要使用關鍵字的方式來指定參數
2.args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
1.group參數未使用,值始終為None
2.target表示調用對象,即子進程要執行的任務
3.args表示調用對象的位置參數元組,args=(1,2,'egon',)
4.kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5.name為子進程的名稱
方法介紹:
1.p.start():啟動進程,并調用該子進程中的p.run()
2.p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
3.p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
4.p.is_alive():如果p仍然運行,返回True
5.p.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹:
1.p.daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程終止時,p也隨之終止,并且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
2 p.name:進程的名稱
3.p.pid:進程的pid
4.p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
5.p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
三、Process類的使用
注意:在windows中Process()必須放到# if __name__ == '__main__':下
1、創建并開啟子進程的兩種方式
方式1:用已有的python類
from multiprocessing import Process
import time
def task(name):
print('%s is running' %name)
time.sleep(3)
print('%s is done' %name)
p=Process(target=task,args=('Alex',))
p.start()
print('主函數')
方式2:繼承現有的類,自己寫一個類
from multiprocessing import Process
import time
class task(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print('%s is running' %self.name)
time.sleep(3)
print('%s is done' %self.name)
p=task('Alex')
p.start()
print('主函數')
2、進程直接的內存空間是隔離的
from multiprocessing import Process
x=100
def task():
global x
x=0
print('子進程x值是:',x)
p=Process(target=task)
p.start()
print('主進程x值是:',x)
3、Process對象的join方法
join是主進程等,等待子進程結束
from multiprocessing import Process
import time
def task(name):
print('%s is running' %name)
time.sleep(3)
print('%s is done' %name)
p=Process(target=task,args=('Alex',))
p.start()
p.join()
print('主函數')
4、Process對象的其他方法或屬性(了解)
進程對象的其他方法:terminate,is_alive,pid
from multiprocessing import Process
import time
def task(name):
print('%s is running' %name)
time.sleep(3)
print('%s is done' %name)
p=Process(target=task,args=('Alex',))
p.start()
print(p.pid)
p.terminate() #關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活,過一會就就不存活了
print(p.is_alive())
print('開始')
print(p.is_alive())
5、僵尸進程與孤兒進程:http://blog.51cto.com/10630401/2069930
四、守護進程
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束后就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止。
from multiprocessing import Process
import time
def task(name):
print('%s is running' %name)
time.sleep(3)
print('%s is done' %name)
p=Process(target=task,args=('Alex',))
p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,并且父進程代碼執行結束,p即終止運行
p.start()
print('主函數')
五、進程同步(鎖)
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
1、多個進程共享同一打印終端
代碼1:并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process
import time,os
def task():
print('%s is running' %os.getpid())
time.sleep(3)
print('%s is done' %os.getpid())
for i in range(3):
p=Process(target=task)
p.start()
代碼2:由并發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import time,os
def task(lock):
lock.acquire()
print('%s is running' %os.getpid())
time.sleep(3)
print('%s is done' %os.getpid())
lock.release()
lock=Lock()
for i in range(3):
p=Process(target=task,args=(lock,))
p.start()
2、模擬購買車票
略(見練習頁)
3、總結
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1)效率低(共享數據基于文件,而文件是硬盤上的數據)
2)需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
1)效率高(多個進程共享一塊內存的數據)
2)幫我們處理好鎖問題。
這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。
1)隊列和管道都是將數據存放于內存中
2)隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
六、隊列
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
1、創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
主要方法:
1)、q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。
如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。
如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
2)、q.get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。
如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。
如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
3)、q.get_nowait():同q.get(False)
4)、q.put_nowait():同q.put(False)
5)、q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
6)、q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
7)、q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
其他方法(了解):
1)、q.cancel_join_thread():不會在進程退出時自動連接后臺線程。可以防止join_thread()方法阻塞
2)、q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,后臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
3)、q.join_thread():連接隊列的后臺線程。此方法用于在調用q.close()方法之后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為
應用:
from multiprocessing import Queue
q=Queue(3)
q.put('first')
q.put(2)
q.put({'count':3})
# q.put('fourth',block=False) #q.put_nowait('fourth')
# q.put('fourth',block=True,timeout=3)
print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False)) #q.get_nowait()
# print(q.get(block=True,timeout=3))
七、生產者消費者模型
1、什么是生產者消費者模型
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。
生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊。
生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
2、為什么要使用生產者和消費者模型
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。
在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。
同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式
3、生產者與消費者實現
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
print(res)
q.put(res)
print('%s 生產了 %s' %(os.getpid(),res))
q=Queue()
p1=Process(target=producer,args=(q,)) #生產者們:即廚師們
c1=Process(target=consumer,args=(q,)) #消費者們:即吃貨們
p1.start()
c1.start()
print('主')
4、生產者消費者模型總結
1)程序中有兩類角色
一類負責生產數據(生產者)
一類負責處理數據(消費者)
2)引入生產者消費者模型為了解決的問題是:
平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度
3)如何實現:
生產者隊列消費者
4)生產者消費者模型實現類程序的解耦和
總結
以上是生活随笔為你收集整理的python多进程编程 多个函数并发执行_python并发编程之多进程编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何正确使用烤箱和微波炉?
- 下一篇: 明星餐饮为何总是坚持不了多久?