当前位置: 首页 > 面试题库 >

在Python中的进程之间共享许多队列

梁兴文
2023-03-14
问题内容

我知道multiprocessing.Manager()如何将其用于创建共享对象,尤其是可以在工作人员之间共享的队列。有这个问题,这个问题,[这个问题](http://codingdict.com/questions/1299甚至是我自己的一个问题。

但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列均由变量标识key

当我需要放置和获取数据时,我想使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多东西。随着multiprocessing进口为mp

for key in all_keys: DICT[key] = mp.Queue在多处理模块(称为multi.py)导入的配置文件中那样定义dict不会返回错误,但是DICT[key]在进程之间不共享队列,每个进程似乎都有自己的队列副本,因此不会发生通信。

如果我尝试在定义DICT进程并启动它们的主要多处理函数的开始处定义,例如

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()

我得到错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance

更改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()

只会使一切变得更糟。multi.py在main函数的顶部而不是内部尝试类似的定义会返回类似的错误。

必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列。有任何想法吗?

编辑

这是程序的基本架构:

1-加载第一个模块,该模块定义一些变量,import
multi,launchsmulti.main()和加载另一个模块,该模块开始一系列模块加载和代码执行。与此同时…

2-multi.main看起来像这样:

def main():
    manager = mp.Manager()
    pool = mp.Pool()
    DICT2 = manager.dict()

    for key in all_keys:
        DICT2[key] = manager.Queue()
        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
        proc_2 =  pool.apply_async(targ2,(DICT2[key], otherargs,)

除了使用pooland之外manager,我还启动了以下程序:

mp.Process(target=targ1, args=(DICT[key],))

3-该函数targ1接收key来自主进程的输入数据(按排序)。它旨在将结果传递给它,DICT[key]以便targ2可以执行其工作。这是不起作用的部分。有任意数量的targ1s,targ2s等,因此有任意数量的队列。

4-这些过程中的某些过程的结果将被发送到一堆不同的数组/熊猫数据帧,这些数据帧也由索引key,我希望可以从任意过程中访问它们,甚至是在不同模块中启动的过程。我还没有写这部分,这可能是一个不同的问题。(我在这里提到它是因为上面3的答案也可能很好地解决了4。)


问题答案:

当您尝试multiprocessing.Queue()通过传递a作为参数共享a时,听起来好像问题开始了。您可以通过创建托管队列来解决此问题:

import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()

当使用管理器创建它时,您正在存储并传递一个 代理
到队列,而不是队列本身,因此即使传递给工作进程的对象是复制的,它仍将指向相同的基础数据结构:您的队列。在概念上,它与C / C
++中的指针非常相似。如果以这种方式创建队列,则在启动工作进程时将能够通过它们。

由于您现在可以传递队列,因此不再需要管理字典。在main中保留一个普通字典,该字典将存储所有映射,并且仅为您的工作进程提供所需的队列,因此他们无需访问任何映射。

我在这里写了一个例子。看起来您在工作人员之间传递对象,这就是在这里完成的工作。假设我们有两个处理阶段,并且数据在的控制下开始和结束main。看看我们如何创建像管道一样连接工人的队列,但是通过
给他们提供 他们需要的队列 ,就不需要他们知道任何映射:

import multiprocessing as mp

def stage1(q_in, q_out):

    q_out.put(q_in.get()+"Stage 1 did some work.\n")
    return

def stage2(q_in, q_out):

    q_out.put(q_in.get()+"Stage 2 did some work.\n")
    return

def main():

    pool = mp.Pool()
    manager = mp.Manager()

    # create managed queues
    q_main_to_s1 = manager.Queue()
    q_s1_to_s2 = manager.Queue()
    q_s2_to_main = manager.Queue()

    # launch workers, passing them the queues they need
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

    # Send a message into the pipeline
    q_main_to_s1.put("Main started the job.\n")

    # Wait for work to complete
    print(q_s2_to_main.get()+"Main finished the job.")

    pool.close()
    pool.join()

    return

if __name__ == "__main__":
    main()

代码产生以下输出:

Main开始了工作。
第一阶段做了一些工作。
第二阶段做了一些工作。
Main完成了工作。

我没有提供AsyncResults在字典中存储队列或对象的示例,因为我仍然不太了解您的程序应该如何工作。但是,既然您可以自由地传递队列,则可以构建字典来根据需要存储队列/进程映射。

实际上,如果您确实在多个工作人员之间建立了一条管道,那么您甚至不需要保留对中“工作人员之间”队列的引用main。创建队列,将其传递给您的工作人员,然后仅保留对main将要使用的队列的引用。如果您确实有“任意数量”的队列,我绝对建议您尝试尽快让旧队列被垃圾回收。



 类似资料:
  • 问题内容: 该模块的文档显示了如何将队列传递给以开头的进程。但是,如何与开始的异步工作进程共享队列?我不需要动态加入或其他任何方式,而只是工人(反复)将其结果报告给基地的一种方法。 失败的原因是: 。我理解这意味着什么,并且我理解继承的建议,而不是要求进行酸洗/酸洗(以及所有Windows特殊限制)。但如何 做 我通过队列的方式,作品?我找不到一个示例,并且我尝试了多种失败的替代方法。请帮忙? 问

  • 问题内容: 我正在尝试使用部分函数,​​以便pool.map()可以定位具有多个参数(在本例中为Lock()对象)的函数。 这是示例代码(摘自我之前的问题的答案): 但是,当我运行此代码时,出现错误: 我在这里想念什么?如何在子流程之间共享锁? 问题答案: 您不能将普通对象传递给方法,因为它们不能被腌制。有两种方法可以解决此问题。一种是创建并传递一个: 不过,这有点重量级;使用需要产生另一个进程来

  • 我想在多个python进程之间共享。以便从其他python进程发送。 如何跨多个python进程共享。 代码如下: 目标是从通道从其他python进程发送。

  • 问题内容: 我有简单的 UDPServer ,它可以与 多处理一起使用 。 我想创建一个列表,其中包含有关所有客户端的信息。 我使用 Manager ,但是我不明白如何在列表中附加信息-我需要传输Manager的对象来处理,但是如何呢?我的新属性方式不起作用。 如何解决?谢谢! 输出: 问题答案: 问题是您要在启动工作进程后立即让主进程完成其执行。当创建过程完成执行后,服务器将关闭,这意味着您的共

  • 问题内容: 我有以下问题。我编写了一个函数,该函数将列表作为输入并为列表中的每个元素创建一个字典。然后,我想将此字典追加到新列表中,以便获得字典列表。我正在尝试为此生成多个进程。我在这里的问题是,我希望不同的进程访问字典列表,因为它由其他进程更新,例如,一旦达到一定长度,就打印一些东西。 我的例子是这样的: 现在我的问题是每个过程都创建自己的过程。有没有一种方法可以在进程之间共享列表,以便所有字典