我在理解multiprocessing
python 3模块中的队列时遇到问题
这就是他们在编程指南中所说的:
请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目由“
feeder”线程馈送到基础管道为止。(子进程可以调用队列的Queue.cancel_join_thread方法来避免这种行为。)这意味着,无论何时使用队列,都需要确保在加入该流程之前,将最终删除所有已放入队列的项目。否则,您无法确定将项目放入队列的进程将终止。还请记住,非守护进程将自动加入。
一个将陷入僵局的示例如下:
从多处理导入过程,队列 定义f(q): q.put('X'* 1000000) 如果__name__ =='__main__': 队列=队列() p =进程(目标= f,参数=(队列,)) p.start() p.join()#这个死锁 obj = queue.get()
此处的解决方法是交换最后两行(或简单地删除p.join()行)。
因此显然,queue.get()
不应在之后调用join()
。
但是,有一些使用队列的示例,其中get
在join
like之后调用该队列:
import multiprocessing as mp
import random
import string
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
if __name__ == "__main__":
# Define an output queue
output = mp.Queue()
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output))
for x in range(2)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
我已经运行了该程序,并且可以运行。
有人可以帮助我了解僵局的规则在这里吗?
允许数据在进程之间传输的多处理队列实现依赖于标准OS管道。
OS管道的长度不是无限长,因此在put()
操作期间,可能会在OS中阻止将数据排队的进程,直到其他一些进程用来get()
从队列中检索数据为止。
对于少量数据(例如您的示例中的数据),主流程可以将join()
所有产生的子流程然后获取数据。这通常效果很好,但无法扩展,也不清楚何时断开。
但是它肯定会破坏大量数据。子流程将在put()
等待主流程通过删除队列中的某些数据时get()
被阻止,但是主流程在join()
等待子流程完成时被阻塞。这将导致死锁。
这是用户遇到此确切问题的示例。我在答案中张贴了一些代码,帮助他解决了问题。
我有一个使用Spring Cloud Streams-RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我不能丢失任何信息。 null 我是这些框架的新手,我希望你能帮助配置我的...
今天,我使用Spring Cloud Streams和RabbitMQ,根据本文档编写了以下代码: 我的接口: 和我的属性文件:
问题内容: Python的多处理程序包中的队列和管道之间的根本区别是什么? 在什么情况下应该选择一种?什么时候使用比较有利?什么时候使用比较有利? 问题答案: A只能有两个端点。 一个可以有多个生产者和消费者。 何时使用它们 如果您需要两个以上的交流点,请使用。 如果您需要绝对的性能,那么a会更快,因为它建立在之上。 绩效基准 假设您要生成两个进程并在它们之间尽快发送消息。这些是使用和进行类似测试
问题内容: 我正在尝试在Python中的多处理库中使用队列。执行下面的代码后(打印语句起作用),但是在调用Queue上的join之后,这些进程没有退出,并且仍然存在。我如何终止其余过程? 谢谢! 问题答案: 尝试这个:
死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。 为了将源Queue的未能成功处理
对于异步的触发器,平台会对函数失败的任务进行最多3次重试。 在新建触发器的时候,为触发器配置一条死信队列,从用户的EMQ队列中选择一条,用于接收函数失败的任务。 在设置死信队列前,请对group: CIf76b0600-24e9-42c4-acf3-d491fbd9fd71 授予 FULL_CONTROL 权限,若不授予权限,平台将丢弃失败的任务信息。 消息的内容如下,以后可能增加字段,请用户在