python 管道、队列_python
Pipe()只能有兩個(gè)端點(diǎn)。
Queue()可以有多個(gè)生產(chǎn)者和消費(fèi)者。
何時(shí)使用它們
如果您需要兩個(gè)以上的點(diǎn)進(jìn)行通信,請(qǐng)使用Queue() 。
如果你需要絕對(duì)性能, Pipe()要快得多,因?yàn)镼ueue()是建立在Pipe()之上的。
績(jī)效基準(zhǔn)
假設(shè)您想要生成兩個(gè)進(jìn)程并盡快在它們之間發(fā)送消息。 這些是使用Pipe()和Queue()類似測(cè)試之間的拖拽競(jìng)賽的時(shí)間結(jié)果......這是在運(yùn)行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上。
僅供參考,我將JoinableQueue()結(jié)果作為獎(jiǎng)勵(lì)投入; JoinableQueue()在queue.task_done()時(shí)會(huì)queue.task_done()任務(wù)(它甚至不知道特定任務(wù),它只計(jì)算隊(duì)列中未完成的任務(wù)),因此queue.join()知道工作已完成。
這個(gè)答案底部的每個(gè)代碼......
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
總之, Pipe()比Queue()快三倍。 除非你真的必須有好處,否則不要考慮JoinableQueue() 。
獎(jiǎng)金材料2
多處理引入了信息流的細(xì)微變化,除非您知道一些快捷方式,否則會(huì)使調(diào)試變得困難。 例如,在許多條件下通過(guò)字典索引時(shí),您可能有一個(gè)可正常工作的腳本,但在某些輸入中很少失敗。
通常我們會(huì)在整個(gè)python進(jìn)程崩潰時(shí)找到失敗的線索; 但是,如果多處理功能崩潰,則不會(huì)將未經(jīng)請(qǐng)求的崩潰回溯打印到控制臺(tái)。 追蹤未知的多處理崩潰是很困難的,沒(méi)有一個(gè)線索來(lái)解決崩潰的過(guò)程。
我發(fā)現(xiàn)追蹤多處理崩潰信息的最簡(jiǎn)單方法是將整個(gè)多處理函數(shù)包裝在try / except并使用traceback.print_exc() :
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
現(xiàn)在,當(dāng)您發(fā)現(xiàn)崩潰時(shí),您會(huì)看到以下內(nèi)容:
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(task_q, result_q)
File "foo.py", line 46, in run
raise ValueError
ValueError
源代碼:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
總結(jié)
以上是生活随笔為你收集整理的python 管道、队列_python的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Android Wear 上位:Goog
- 下一篇: 联想-V470 Ubuntu 屏幕亮度设