多进程Queue
進程間通訊
不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue
#將子進程數據傳給主進程
def f(yy):#子進程
yy.put([42, None, 'hello'])
if __name__ == '__main__':#主進程
q = Queue()#生成進程Queue,線程Queue不行
p = Process(target=f, args=(q,))#將Queue傳給子進程
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
#結果:[42, None, 'hello']
另一種方法:
Pipes
ThePipe()function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.send(['I am your song'])
print(conn.recv(),'收到了!')
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()#生成管道實例:parent_conn, child_conn分別為管道的兩頭
p = Process(target=f, args=(child_conn,))#管道一頭傳數據
p.start()
print(parent_conn.recv()) #管道另一頭收數據 prints "[42, None, 'hello']"
print(parent_conn.recv()) #['I am your song']
# print(parent_conn.recv()) #如果一頭只發了兩次,另一頭接受之后,還想接受一次,會卡主。比如發幾次,收幾次。
parent_conn.send('我也給你發條信息,你能收到嗎?')
p.join()
結果:
[42, None, 'hello'] ['I am your song'] 我也給你發條信息,你能收到嗎? 收到了!
View Code
以上只實現了數據的傳遞,要實現數據的共享,比如兩個進程同時修改一份數據,該怎么辦呢?
Managers
A manager object returned byManager()controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned byManager()will support typeslist,dict,Namespace(變量),Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,ValueandArray(manager支持很多很多格式的共享)
For example:
from multiprocessing import Process, Manager
import os
#多進程實現數據的共享:不需要加鎖,
#因為manager已經自動加鎖了,不允許多個進程同時修改一份數據,進程數據是獨立的,
#比如共享字典數據,實際已經把字典copy了10(進程數量)份了,最終通過序列化和反序列化擬合在一起,所以數據不會亂。
def f(d, l):
d[1] = '1'#10個進程以同樣的方式修改字典,結果里還是一樣的值
d['2'] = 2
d[0.25] = None
l.append(os.getpid())#放10個進程里每個進程號
print('每個進程操作:',l)
if __name__ == '__main__':
with Manager() as manager:#也可以寫成 manager = Manager(),賦一個變量
d = manager.dict()#生成一個可在多個進程之間傳遞和共享的字典
l = manager.list(range(5))#生成一個可在多個進程之間傳遞和共享的列表
p_list = []#準備以后存多個進程
for i in range(10):#生成10個進程
p = Process(target=f, args=(d, l))#將值傳給d,l
p.start()
p_list.append(p)
for res in p_list:#等待結果
res.join()
print('最終結果:')
print(d)
print(l)
結果:
各個進程操作: [0, 1, 2, 3, 4, 2108]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476, 1368]
各個進程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476, 1368, 4360]
最終結果:
{1: '1', '2': 2, 0.25: None}
[0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476, 1368, 4360]
View Code
總結
- 上一篇: 软件架构乱弹——问题域及其解决方法
- 下一篇: ELDataQuery 基于.NET 2