当前位置: 首页 > 面试题库 >

Python多处理池超时

沈子实
2023-03-14
问题内容

我想使用multiprocessing.Pool,但是multiprocessing.Pool不能在超时后中止任务。我找到了解决方案,并对其进行了一些修改。

from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time


def worker(y):
    print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
    start = time.time()
    while True:
       if time.time() - start >= y:
           break
       time.sleep(0.5)
       # show work progress
       print(y)
    return y


def collect_my_result(result):
    print("Got result {}".format(result))


def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        # Wait timeout seconds for func to complete.
        out = res.get(timeout)
    except TimeoutError:
        print("Aborting due to timeout {}".format(args[1]))
        # kill worker itself when get TimeoutError
        sys.exit(1)
    else:
        return out


def empty_func():
    pass


if __name__ == "__main__":
    TIMEOUT = 4
    util.log_to_stderr(util.DEBUG)
    pool = Pool(processes=4)

    # k - time to job sleep
    featureClass = [(k,) for k in range(20, 0, -1)]  # list of arguments
    for f in featureClass:
        # check available worker
        pool.apply(empty_func)

        # run job with timeout
        abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT)
        pool.apply_async(abortable_func, args=f, callback=collect_my_result)

    time.sleep(TIMEOUT)
    pool.terminate()
    print("exit")

主要修改-使用 sys.exit(1)
退出工作进程。它杀死了工作进程并杀死了工作线程,但是我不确定这个解决方案是否很好。当进程因正在运行的作业而终止时,我会遇到哪些潜在的问题?


问题答案:

停止正在运行的作业没有隐含的风险,操作系统将负责正确终止进程。

如果您的工作是在文件上进行写操作,则磁盘上可能会有很多被截断的文件。

如果您在数据库上编写或与某个远程进程连接,则也可能会出现一些小问题。

但是,Python标准池不支持超时,并且突然终止进程可能会导致应用程序内部出现异常行为。

Pebble处理池确实支持超时任务。

from pebble import process, TimeoutError

with process.Pool() as pool:
    task = pool.schedule(function, args=[1,2], timeout=5)

    try:
        result = task.get()
    except TimeoutError:
        print "Task: %s took more than 5 seconds to complete" % task


 类似资料:
  • 我有一个图像路径列表,我想在进程或线程之间划分,以便每个进程处理列表的某些部分。处理包括从磁盘加载图像,进行一些计算并返回结果。我正在使用Python 2.7 下面是我如何创建辅助进程 我所面临的问题是,当我在initializer函数中记录初始化时间时,我知道worker不是并行初始化的,而是每个worker都以5秒的间隔初始化,下面是供参考的日志 我尝试过使用将同时启动辅助线程 我知道Wind

  • 问题内容: 是否有用于工作线程的类,类似于多处理模块的类? 我喜欢例如并行化地图功能的简单方法 但是,我希望这样做而不会产生新流程的开销。 我知道。但是,在我的用例中,该函数将是绑定的函数,python包装器将在实际函数调用之前为其释放。 我必须编写自己的线程池吗? 问题答案: 我刚刚发现模块中实际上 有一个基于线程的Pool接口,但是它有些隐藏并且没有正确记录。 可以通过导入 它是使用包装Pyt

  • 我甚至不能使用Python 2.7中运行的多重处理包(使用spyder作为窗口上的用户界面)进行并行处理的最简单的例子,我需要帮助解决这个问题。我已经运行了conda更新,所以所有的包都应该是最新的和兼容的。 即使多处理软件包文档(如下所示)中的第一个示例也不起作用,它会生成4个新进程,但控制台只是挂起。在过去的3天里,我已经尝试了我能找到的一切,但是没有一个不挂起就运行的代码能够将我25%以上的

  • 我有下面的代码片段,它试图在多个子进程之间分割处理。 while循环中的主进程正在调用search函数,如果队列达到阈值计数,则处理池将映射到进程函数,其中作业来自队列。我的问题是,python多处理池是在执行期间阻塞主进程,还是立即继续执行?我不想遇到这样的情况,“has_jobs_to_process()”的计算结果为true,并且在处理作业的过程中,另一组作业的计算结果为true,并且再次调

  • 按预期更正代码:从多处理导入池导入信号导入时间导入操作系统 ================================================== 我发现了问题,当我使用map函数时,主func被阻塞,只有map函数被funish时才会调用信号处理程序。所以,使用"map_async"函数来解决这个问题要好得多。 以下是我的发现: 纯用C语言实现的长时间运行的计算(例如对大量文本进行

  • 问题内容: 如何使用python的多处理池处理KeyboardInterrupt事件?这是一个简单的示例: 当运行上面的代码时,按时会引发,但是该过程只是在此时挂起,我必须在外部将其杀死。 我希望能够随时按下并导致所有进程正常退出。 问题答案: 这是一个Python错误。等待threading.Condition.wait()中的条件时,从不发送KeyboardInterrupt。复制: 直到wa