当前位置: 首页 > 面试题库 >

是否可以在Pool.imap调用的函数中使用多处理队列?

季华茂
2023-03-14
问题内容

我正在使用python
2.7,并尝试在自己的进程中运行一些CPU繁重的任务。我希望能够将消息发送回父流程,以使其随时了解流程的当前状态。为此,多处理队列似乎很完美,但我不知道如何使它工作。

因此,这是我的基本工作示例,减去了Queue的使用。

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我尝试以几种方式传递队列,它们收到错误消息“
RuntimeError:队列对象仅应通过继承在进程之间共享”。这是我根据之前发现的答案尝试的一种方法。(尝试使用Pool.map_async和Pool.imap时遇到相同的问题)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

最后,“ 0适应度”方法(使其成为全局变量)不会生成任何消息,而只是将其锁定。

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

我知道它可能会直接与multiprocessing.Process一起使用,并且还有其他库可以完成此操作,但是我讨厌放弃非常适合的标准库函数,直到我确定不仅仅是我所缺少的知识使我无法利用它们。

谢谢。


问题答案:

诀窍是将Queue作为参数传递给初始化程序。似乎可以与所有Pool调度方法一起使用。

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()


 类似资料:
  • 我对keycloak不熟悉,对它没有深入的了解。我们必须在Spring Boot中为一个应用程序实现身份验证,其中有团队,用户可以是不同团队的一部分。他/用户可以为每个团队拥有不同的角色/权限。例如,用户A可以是team1的经理,同时,他可以是team2的管理员,等等。 我以一种为每个团队提供一个钥匙斗篷客户端的方式实现了它。客户端角色将作为团队权限分配给用户。 现在,我想以这样一种方式限制API

  • 问题内容: 我想使用Streams.intRange(int start,int end,int step)实现反向排序的流。但是,似乎java.util.Streams类不再可用(但是它仍在标准库的rt.jar中)。此方法是在其他类中还是被其他方法替代? 问题答案: 实际上,JDK中再也没有这种方法了。您能获得的下一个最接近的位置是,但是只会一步一步走。 一种解决方案是实施您自己的解决方案。例如

  • 我仍然是AnyLogic和JAVA的基本用户,但我遇到了获取服务块剩余时间的问题。我知道服务块嵌入了延迟块,并且有一个用于延迟的函数(getRemainingTime(代理))。是否可以在(代理)main中调用此函数? 我需要获得我的产品将停留在服务块的时间,以导航我的推送过程。我有3个产品(都是个人代理)和一个在服务前装配我的产品和1个原材料(个人代理)的拾取块。显然,剩余时间必须是动态的,并且

  • 问题内容: 考虑类型: 现在假设我要遍历一个类实例的集合并在每个实例上调用一些函数: 该语法非常紧凑,但感觉有点尴尬。另外,它不能在任何地方使用。例如,在语句条件中: 理想情况下,写类似 但是从Swift 2.1开始,这不是正确的语法。这种引用该函数的方式将类似于以下内容: 有没有更好的方法来引用实例函数?您如何喜欢写这样的表达式? 问题答案: 这里有两个不同的问题。的 拖尾闭合语法 可以在调用函

  • 问题内容: 我想使用React在整个DOM中多次添加组件。这个小提琴显示了我要执行的操作,并且不会引发任何错误。这是代码: HTML: JS: 我已经看过这个问题了,恐怕通过上述操作,我将冒使React组件相互干扰的风险。该问题的答案建议使用服务器端渲染,这对我来说不是一个选择,因为我正在使用Django服务器端。 另一方面,也许我在做什么就可以了,因为我只安装了一个React库实例(而不是多个组

  • 我的问题是,如果我们有两个原始事件流,即烟雾和温度,并且我们想通过将运算符应用于原始流来找出复杂事件(即火灾)是否发生,我们可以在Flink中做到这一点吗? 我问这个问题是因为到目前为止,我所看到的Flink CEP的所有示例都只包括一个输入流。如果我错了,请纠正我。