Python的多处理程序包中的队列和管道之间的根本区别是什么?
在什么情况下应该选择一种?什么时候使用比较有利Pipe()
?什么时候使用比较有利Queue()
?
APipe()
只能有两个端点。
一个Queue()
可以有多个生产者和消费者。
何时使用它们
如果您需要两个以上的交流点,请使用Queue()
。
如果您需要绝对的性能,那么aPipe()
会更快,因为Queue()
它建立在之上Pipe()
。
绩效基准
假设您要生成两个进程并在它们之间尽快发送消息。这些是使用Pipe()
和进行类似测试之间的拖动竞赛的计时结果Queue()
。这是在运行Ubuntu
11.10和Python 2.7.2的ThinkpadT61上进行的。
仅供参考,我把成绩JoinableQueue()
作为奖金;JoinableQueue()
在queue.task_done()
调用时说明任务(它甚至不知道特定任务,它只计算队列中未完成的任务),以便queue.join()
知道工作已完成。
此答案底部的每个代码…
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
概括而言,Pipe()
它的速度比速度快3倍Queue()
。JoinableQueue()
除非您确实必须有好处,否则请不要考虑。
奖励材料2
除非您知道一些捷径,否则多处理会在信息流中引入微妙的变化,使调试变得困难。例如,在许多情况下,当您通过字典建立索引时,您的脚本可能工作正常,但是某些输入很少会失败。
通常,当整个python进程崩溃时,我们会获得有关失败的线索;但是,如果多处理功能崩溃,则不会在控制台上打印未经请求的崩溃回溯。很难找到未知的多处理崩溃,而又不知道导致进程崩溃的线索。
我发现跟踪多处理崩溃信息的最简单方法是将整个多处理功能包装在try
/中except
并使用traceback.print_exc()
:
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
现在,当您发现崩溃时,您会看到类似以下内容的信息:
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
源代码:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
问题内容: 有谁知道bash如何通过管道发送数据? 此命令是否将file.txt的所有内容打印到缓冲区中,然后由tail读取?还是说,此命令是逐行打印file.txt的内容,然后在每一行停顿以进行尾部处理,然后请求更多数据? 我问的原因是我要在嵌入式设备上编写程序,该程序基本上对某些数据块执行一系列操作,其中一个操作的输出作为下一个操作的输入发出。我想知道linux(bash)是如何处理的,因此请
现在多分支管道作业类型已经成熟,还有什么理由再使用简单的管道作业类型吗?即使您现在只有一个分支,考虑到未来多个分支的可能性可能是明智的,那么假设您将Jenkins管道存储在SCM中,那么为您的Jenkins管道使用管道作业类型与始终使用多分支管道作业类型的动机是什么?现在这两种作业类型之间是否存在功能平价?
问题内容: 我正在尝试在Python中的多处理库中使用队列。执行下面的代码后(打印语句起作用),但是在调用Queue上的join之后,这些进程没有退出,并且仍然存在。我如何终止其余过程? 谢谢! 问题答案: 尝试这个:
问题内容: 我正在尝试模拟使用扭曲运行的应用程序网络。作为仿真的一部分,我希望同步某些事件,并能够为每个进程提供大量数据。我决定使用多处理事件和队列。但是,我的过程变得异常混乱。 我在下面编写了示例代码来说明问题。具体来说,(大约95%的时间是在我的沙桥机器上), “ run_in_thread”函数完成了,但是直到我按Ctrl- C之后才调用“ print_done”回调。 另外,我可以更改示例
问题内容: 多重处理是python中的强大工具,我想更深入地了解它。我想知道何时使用 常规的 锁和队列,何时使用多处理管理器在所有进程之间共享它们。 我提出了以下测试方案,其中包含四种不同的条件进行多处理: 使用池和 NO Manager 使用池和管理器 使用单个流程和 NO Manager 使用单个流程和一个经理 工作 所有条件都执行作业功能。包括一些通过锁固定的打印件。此外,该函数的输入只是放
问题内容: 我正在使用以下node.js代码从某些url下载文档并将其保存在磁盘中。我想知道何时下载该文档。我没有看到使用pipe进行任何回调。或者,下载完成后是否可以捕获任何“结束”事件? 问题答案: 流是,因此您可以听某些事件。如您所说,有一个请求事件(以前是)。 有关可用的事件的更多信息,请查看流文档页面。