当前位置: 首页 > 面试题库 >

在队列为空之前调用join时,Python 3多处理队列死锁

谭光辉
2023-03-14
问题内容

我在理解multiprocessingpython 3模块中的队列时遇到问题

这就是他们在编程指南中所说的:

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目由“
feeder”线程馈送到基础管道为止。(子进程可以调用队列的Queue.cancel_join_thread方法来避免这种行为。)

这意味着,无论何时使用队列,都需要确保在加入该流程之前,将最终删除所有已放入队列的项目。否则,您无法确定将项目放入队列的进程将终止。还请记住,非守护进程将自动加入。

一个将陷入僵局的示例如下:

从多处理导入过程,队列

定义f(q):
    q.put('X'* 1000000)

如果__name__ =='__main__':
    队列=队列()
    p =进程(目标= f,参数=(队列,))
    p.start()
    p.join()#这个死锁
    obj = queue.get()

此处的解决方法是交换最后两行(或简单地删除p.join()行)。

因此显然,queue.get()不应在之后调用join()

但是,有一些使用队列的示例,其中getjoinlike之后调用该队列:

import multiprocessing as mp
import random
import string

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                string.ascii_lowercase
                + string.ascii_uppercase
                + string.digits)
    for i in range(length))
        output.put(rand_str)

 if __name__ == "__main__":
     # Define an output queue
     output = mp.Queue()

     # Setup a list of processes that we want to run
     processes = [mp.Process(target=rand_string, args=(5, output))
                    for x in range(2)]

     # Run processes
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()

    # Get process results from the output queue
    results = [output.get() for p in processes]

    print(results)

我已经运行了该程序,并且可以运行。

有人可以帮助我了解僵局的规则在这里吗?


问题答案:

允许数据在进程之间传输的多处理队列实现依赖于标准OS管道。

OS管道的长度不是无限长,因此在put()操作期间,可能会在OS中阻止将数据排队的进程,直到其他一些进程用来get()从队列中检索数据为止。

对于少量数据(例如您的示例中的数据),主流程可以将join()所有产生的子流程然后获取数据。这通常效果很好,但无法扩展,也不清楚何时断开。

但是它肯定会破坏大量数据。子流程将在put()等待主流程通过删除队列中的某些数据时get()被阻止,但是主流程在join()等待子流程完成时被阻塞。这将导致死锁。

这是用户遇到此确切问题的示例。我在答案中张贴了一些代码,帮助他解决了问题。



 类似资料:
  • 我有一个使用Spring Cloud Streams-RabbitMQ在微服务中交换消息的项目。对我的项目至关重要的一件事是,我不能丢失任何信息。 null 我是这些框架的新手,我希望你能帮助配置我的...

  • 今天,我使用Spring Cloud Streams和RabbitMQ,根据本文档编写了以下代码: 我的接口: 和我的属性文件:

  • 问题内容: Python的多处理程序包中的队列和管道之间的根本区别是什么? 在什么情况下应该选择一种?什么时候使用比较有利?什么时候使用比较有利? 问题答案: A只能有两个端点。 一个可以有多个生产者和消费者。 何时使用它们 如果您需要两个以上的交流点,请使用。 如果您需要绝对的性能,那么a会更快,因为它建立在之上。 绩效基准 假设您要生成两个进程并在它们之间尽快发送消息。这些是使用和进行类似测试

  • 问题内容: 我正在尝试在Python中的多处理库中使用队列。执行下面的代码后(打印语句起作用),但是在调用Queue上的join之后,这些进程没有退出,并且仍然存在。我如何终止其余过程? 谢谢! 问题答案: 尝试这个:

  • 死信队列(Dead Letter Queue)本质上同普通的Queue没有区别,只是它的产生是为了隔离和分析其他Queue(源Queue)未成功处理的消息。 创建死信队列的方法参见createQueue() API,与创建普通队列无异, 死信队列不可调用deadMessage(), deadMessageBatch API,其他操作都与对普通Queue的操作无异。 为了将源Queue的未能成功处理

  • 对于异步的触发器,平台会对函数失败的任务进行最多3次重试。 在新建触发器的时候,为触发器配置一条死信队列,从用户的EMQ队列中选择一条,用于接收函数失败的任务。 在设置死信队列前,请对group: CIf76b0600-24e9-42c4-acf3-d491fbd9fd71​ 授予 FULL_CONTROL 权限,若不授予权限,平台将丢弃失败的任务信息。 消息的内容如下,以后可能增加字段,请用户在