当前位置: 首页 > 面试题库 >

与多处理事件和队列不兼容吗?

赫连睿
2023-03-14
问题内容

我正在尝试模拟使用扭曲运行的应用程序网络。作为仿真的一部分,我希望同步某些事件,并能够为每个进程提供大量数据。我决定使用多处理事件和队列。但是,我的过程变得异常混乱。

我在下面编写了示例代码来说明问题。具体来说,(大约95%的时间是在我的沙桥机器上), “ run_in_thread”函数完成了,但是直到我按Ctrl-
C之后才调用“ print_done”回调。

另外,我可以更改示例代码中的几项内容,以使此工作更可靠,例如:减少派生的进程数,从reactor_ready调用self.ready.set或更改deferLater的延迟。

我猜想在扭曲的反应堆与阻塞诸如Queue.get()或Event.wait()之类的多处理调用之间存在竞争状况吗?

我遇到的问题到底是什么?我的代码中是否存在我所缺少的错误?我可以解决此问题,还是与多处理事件/队列不兼容?

其次,会推荐像spawnProcess或Ampoule这样的替代方案吗?(如混合处理多线程混合Python中所建议?

编辑(按要求):

我尝试过glib2reactorselectreactor,polreactor和epollreactor的所有反应器都遇到了问题。epollreactor似乎能提供最佳结果,并且在下面给出的示例中似乎可以正常工作,但在我的应用程序中仍然给我带来相同(或相似)的问题。我将继续调查。

我正在运行Gentoo Linux内核3.3和3.4,python 2.7,并且尝试了Twisted
10.2.0、11.0.0、11.1.0、12.0.0和12.1.0。

除了我的沙桥机器之外,我在双核AMD机器上也看到了相同的问题。

#!/usr/bin/python
# -*- coding: utf-8 *-*

from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task

from multiprocessing import Process
from multiprocessing import Event

class TestA(Process):
    def __init__(self):
        super(TestA, self).__init__()
        self.ready = Event()
        self.ready.clear()
        self.start()

    def run(self):
        reactor.callWhenRunning(self.reactor_ready)
        reactor.run()

    def reactor_ready(self, *args):
        task.deferLater(reactor, 1, self.node_ready)
        return args

    def node_ready(self, *args):
        print 'node_ready'
        self.ready.set()
        return args

def reactor_running():
    print 'reactor_running'
    df = threads.deferToThread(run_in_thread)
    df.addCallback(print_done)

def run_in_thread():
    print 'run_in_thread'
    for n in processes:
        n.ready.wait()

def print_done(dfResult=None):
    print 'print_done'
    reactor.stop()

if __name__ == '__main__':
    processes = [TestA() for i in range(8)]
    reactor.callWhenRunning(reactor_running)
    reactor.run()

问题答案:

简短的答案是肯定的,Twisted和多重处理彼此不兼容,并且您无法可靠地使用它们。

在所有POSIX平台上,子流程管理与SIGCHLD处理紧密相关。POSIX信号处理程序是全局进程,每种信号类型只能有一个。

Twisted和stdlibmultiprocessing不能都SIGCHLD安装处理程序。其中只有一个可以。这意味着其中只有一个可以可靠地管理子进程。您的示例应用程序无法控制它们中的哪一个将赢得该功能,因此我希望由该事实引起的行为不确定性。

但是,您的示例更直接的问题是,您在父进程中加载​​了Twisted,然后用于multiprocessing派生 而不执行
所有子进程。Twisted不支持这样使用。如果先分叉然后执行,就没有问题。但是,缺少新进程的执行程序(也许是使用Twisted的Python进程)会导致Twisted无法解决的所有额外共享状态。在您的特定情况下,导致此问题的共享状态是用于实现的内部“
waker fd”
deferToThread。在父级和所有子级之间共享fd的情况下,当父级尝试唤醒主线程以传递deferToThread调用结果时,它很可能会唤醒
子级进程之一 。子进程没有什么用处,因此只是浪费时间。同时,父线程中的主线程永远不会唤醒,也永远不会注意到线程任务已完成。

您可以通过在创建子进程之前不加载任何Twisted来避免此问题。就Twisted而言,这将使您的使用变成一个单进程用例(在每个进程中,它将首先被加载,然后该进程将
不再 继续进行分叉,因此毫无疑问如何进行分叉和扭曲互动了)。这意味着,直到创建子进程之后,才导入Twisted。

当然,这只会对Twisted有所帮助。您使用的任何其他库都可能遇到类似的麻烦(您提到了glib2,这是另一个库的一个很好的示例,如果您尝试像这样使用它,它将完全阻塞)。

我强烈建议完全不使用该multiprocessing模块。相反,请使用涉及fork exec的任何多进程方法,
不要单独使用fork。安瓿瓶属于这一类。



 类似资料:
  • 问题内容: Python的多处理程序包中的队列和管道之间的根本区别是什么? 在什么情况下应该选择一种?什么时候使用比较有利?什么时候使用比较有利? 问题答案: A只能有两个端点。 一个可以有多个生产者和消费者。 何时使用它们 如果您需要两个以上的交流点,请使用。 如果您需要绝对的性能,那么a会更快,因为它建立在之上。 绩效基准 假设您要生成两个进程并在它们之间尽快发送消息。这些是使用和进行类似测试

  • 问题内容: 我正在尝试使用Python中的pygame为应用程序制定简单的控件。我已经掌握了基础知识,但是碰到了奇怪的墙:我正在使用箭头键来控制角色。如果我按住一个箭头键,然后按住另一个箭头键(以对角线方向移动),则角色将按预期方式移动。但是,如果我释放了所按下的 第二个 键(同时仍然按住第 一个 键),即使我仍然按住 第一个 键,角色也会停止移动。这是我的简单运动代码: 现在,我自然对此感到非常

  • 问题内容: Sidekiq可以阻止哪些可能的原因来处理队列中的作业?队列已满。日志文件表明完全没有活动。因此,队列已满,但日志为空,Sidekiq似乎未处理项目。似乎没有工人在处理工作。重新启动Redis或用FLUSHALL或FLUSHDB冲洗均无效。Sidekiq已开始于 捆绑执行程序sidekiq -L log / sidekiq.log 并生成以下日志文​​件: 您如何找出问题所在?是否有隐

  • 本文向大家介绍javascript浏览器兼容教程之事件处理,包括了javascript浏览器兼容教程之事件处理的使用技巧和注意事项,需要的朋友参考一下 1. window.event 【分析说明】先看一段代码   以上代码在IE运行的结果是[object],而在Firefox无法运行。   因为在IE中event作为window对象的一个属性可以直接使用,但是在Firefox中却使用了W3C的模型

  • 问题内容: 我正在尝试在Python中的多处理库中使用队列。执行下面的代码后(打印语句起作用),但是在调用Queue上的join之后,这些进程没有退出,并且仍然存在。我如何终止其余过程? 谢谢! 问题答案: 尝试这个:

  • 我刚才看到了三个方法的文档,当我们在工作线程中工作时,它们可以用来在UI线程中执行一段代码。方法有: > public final void runOnUIThread(Runnable action)-在UI线程上运行指定的操作。如果当前线程是UI线程,则立即执行该操作。如果当前线程不是UI线程,则将操作发布到UI线程的事件队列中 public boolean post(Runnable act