当前位置: 首页 > 知识库问答 >
问题:

使用python多处理异步启动大量依赖进程

公羊灿
2023-03-14

问题:我有一个类似DAG(有向无环图)的结构,用于在机器上开始执行一些海量数据处理。某些流程只能在其父级数据处理完成时启动,因为存在多级处理。我想首先使用python多处理库在一台机器上处理它的所有内容,然后使用管理器在不同的机器上扩展执行。我以前没有python多处理的经验。有人能建议这是一个好的图书馆吗?如果是,一些基本的实现想法就可以了。如果不是,在python中还有什么可以用来做这件事?

例子:

A-

B-

C-

在上面的例子中我想踢A

附言:抱歉,我不能分享实际数据,因为保密,尽管我试图用这个例子来说明。

共有1个答案

庾君博
2023-03-14

我非常喜欢为这样的事情使用进程和队列。

像这样:

from multiprocessing import Process, Queue
from Queue import Empty as QueueEmpty
import time

#example process functions
def processA(queueA, queueB):
    while True:
        try:
            data = queueA.get_nowait()
            if data == 'END':
                break
        except QueueEmpty:
            time.sleep(2) #wait some time for data to enter queue
            continue
        #do stuff with data
        queueB.put(data)

def processA(queueB, _):
    while True:
        try:
            data = queueB.get_nowait()
            if data == 'END':
                break
        except QueueEmpty:
            time.sleep(2) #wait some time for data to enter queue
            continue
        #do stuff with data

#helper functions for starting and stopping processes
def start_procs(num_workers, target_function, args):
    procs = []
    for _ in range(num_workers):
        p = Process(target=target_function, args=args)
        p.start()
        procs.append(p)
    return procs

def shutdown_process(proc_lst, queue):
    for _ in proc_lst:
        queue.put('END')
    for p in proc_lst:
        try:
            p.join()
        except KeyboardInterrupt:
            break

queueA = Queue(<size of queue> * 3) #needs to be a bit bigger than actual. 3x works well for me
queueB = Queue(<size of queue>)
queueC = Queue(<size of queue>)
queueD = Queue(<size of queue>)

procsA = start_procs(number_of_workers, processA, (queueA, queueB)) 
procsB = start_procs(number_of_workers, processB, (queueB, None))  

# feed some data to processA
[queueA.put(data) for data in start_data]  

#shutdown processes
shutdown_process(procsA, queueA)
shutdown_process(procsB, queueB)

#etc, etc. You could arrange the start, stop, and data feed statements to arrive at the dag behaviour you desire
 类似资料:
  • 我使用的是Python 3.5多处理应用异步。我的代码类似于。我在args中传递一个信息(来自信息的对象)。它有一个名为startTime的数据成员。我希望当myFunc开始运行时,将被写入时间。时间()。问题是主流程中的信息和子流程中的信息不一样<代码>信息。开始时间=时间。myFunc中的time()不会更改主进程中的信息。有没有一个好办法来挽救startTime?谢谢

  • 我正在尝试进行大量的外部服务调用,每个调用都遵循异常处理和有条件的进一步处理。我认为使用内部的. on完成来扩展这个不错的(Scala中带有期货的异步IO)示例会很容易,但似乎我对范围和/或期货有些不理解。有人能给我指出正确的方向吗? 在我的电脑上(Scala 2.10.4 ),这打印出来: 我要(顺序不重要):

  • 问题内容: 我发现在Python 3.4中,用于多处理/线程的库很少:多处理vs线程与asyncio。 但是我不知道使用哪个,或者是“推荐的”。他们做的是同一件事还是不同?如果是这样,则将哪一个用于什么?我想编写一个在计算机上使用多核的程序。但是我不知道我应该学习哪个图书馆。 问题答案: 它们旨在(略有)不同的目的和/或要求。CPython(典型的主线Python实现)仍然具有全局解释器锁,因此多

  • 我遵循了spring批处理文档,无法异步运行我的作业。 因此,我从一个web容器运行该作业,该作业将通过RESTendpoint触发。 我想让JobInstance ID在完成整个作业之前传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。 BatchConfig创建异步JobL

  • 问题内容: 由于缺乏对我想使用的某些库的支持,我将一些Python开发从Windows迁移到Linux开发。我整天的大部分时间都在搞弄依赖关系无所适从。 问题 每当我选择Linux时,无论是通过apt-get,easy_install还是pip进行安装,我通常都会遇到某种依赖问题,通常与开发库有关。我本可以将几天的时间浪费在应该是简单的任务上,而不是编写代码,而要花更长的时间使库工作。 在哪里可以

  • 问题内容: 我目前正在使用node.js应用程序,并且遇到了通常的异步代码问题。 我正在Node的HTTP模块之上实现服务服务器。 该服务器支持(类似表达)路由。例如,我有如下代码: 服务器需要能够承受故障,当传递给任何函数的问题出现时,我不想使整个服务器崩溃。当我编写如下代码时,会发生问题: 我看不到如何在这里捕获错误。我不想因一个服务器端故障而使服务器崩溃,而是要服务500个。 我能够提出的唯