我想使用multiprocessing.Pool,但是multiprocessing.Pool不能在超时后中止任务。我找到了解决方案,并对其进行了一些修改。
from multiprocessing import util, Pool, TimeoutError
from multiprocessing.dummy import Pool as ThreadPool
import threading
import sys
from functools import partial
import time
def worker(y):
print("worker sleep {} sec, thread: {}".format(y, threading.current_thread()))
start = time.time()
while True:
if time.time() - start >= y:
break
time.sleep(0.5)
# show work progress
print(y)
return y
def collect_my_result(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
# Wait timeout seconds for func to complete.
out = res.get(timeout)
except TimeoutError:
print("Aborting due to timeout {}".format(args[1]))
# kill worker itself when get TimeoutError
sys.exit(1)
else:
return out
def empty_func():
pass
if __name__ == "__main__":
TIMEOUT = 4
util.log_to_stderr(util.DEBUG)
pool = Pool(processes=4)
# k - time to job sleep
featureClass = [(k,) for k in range(20, 0, -1)] # list of arguments
for f in featureClass:
# check available worker
pool.apply(empty_func)
# run job with timeout
abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT)
pool.apply_async(abortable_func, args=f, callback=collect_my_result)
time.sleep(TIMEOUT)
pool.terminate()
print("exit")
主要修改-使用 sys.exit(1)
退出工作进程。它杀死了工作进程并杀死了工作线程,但是我不确定这个解决方案是否很好。当进程因正在运行的作业而终止时,我会遇到哪些潜在的问题?
停止正在运行的作业没有隐含的风险,操作系统将负责正确终止进程。
如果您的工作是在文件上进行写操作,则磁盘上可能会有很多被截断的文件。
如果您在数据库上编写或与某个远程进程连接,则也可能会出现一些小问题。
但是,Python标准池不支持超时,并且突然终止进程可能会导致应用程序内部出现异常行为。
Pebble处理池确实支持超时任务。
from pebble import process, TimeoutError
with process.Pool() as pool:
task = pool.schedule(function, args=[1,2], timeout=5)
try:
result = task.get()
except TimeoutError:
print "Task: %s took more than 5 seconds to complete" % task
我有一个图像路径列表,我想在进程或线程之间划分,以便每个进程处理列表的某些部分。处理包括从磁盘加载图像,进行一些计算并返回结果。我正在使用Python 2.7 下面是我如何创建辅助进程 我所面临的问题是,当我在initializer函数中记录初始化时间时,我知道worker不是并行初始化的,而是每个worker都以5秒的间隔初始化,下面是供参考的日志 我尝试过使用将同时启动辅助线程 我知道Wind
问题内容: 是否有用于工作线程的类,类似于多处理模块的类? 我喜欢例如并行化地图功能的简单方法 但是,我希望这样做而不会产生新流程的开销。 我知道。但是,在我的用例中,该函数将是绑定的函数,python包装器将在实际函数调用之前为其释放。 我必须编写自己的线程池吗? 问题答案: 我刚刚发现模块中实际上 有一个基于线程的Pool接口,但是它有些隐藏并且没有正确记录。 可以通过导入 它是使用包装Pyt
我甚至不能使用Python 2.7中运行的多重处理包(使用spyder作为窗口上的用户界面)进行并行处理的最简单的例子,我需要帮助解决这个问题。我已经运行了conda更新,所以所有的包都应该是最新的和兼容的。 即使多处理软件包文档(如下所示)中的第一个示例也不起作用,它会生成4个新进程,但控制台只是挂起。在过去的3天里,我已经尝试了我能找到的一切,但是没有一个不挂起就运行的代码能够将我25%以上的
我有下面的代码片段,它试图在多个子进程之间分割处理。 while循环中的主进程正在调用search函数,如果队列达到阈值计数,则处理池将映射到进程函数,其中作业来自队列。我的问题是,python多处理池是在执行期间阻塞主进程,还是立即继续执行?我不想遇到这样的情况,“has_jobs_to_process()”的计算结果为true,并且在处理作业的过程中,另一组作业的计算结果为true,并且再次调
按预期更正代码:从多处理导入池导入信号导入时间导入操作系统 ================================================== 我发现了问题,当我使用map函数时,主func被阻塞,只有map函数被funish时才会调用信号处理程序。所以,使用"map_async"函数来解决这个问题要好得多。 以下是我的发现: 纯用C语言实现的长时间运行的计算(例如对大量文本进行
问题内容: 如何使用python的多处理池处理KeyboardInterrupt事件?这是一个简单的示例: 当运行上面的代码时,按时会引发,但是该过程只是在此时挂起,我必须在外部将其杀死。 我希望能够随时按下并导致所有进程正常退出。 问题答案: 这是一个Python错误。等待threading.Condition.wait()中的条件时,从不发送KeyboardInterrupt。复制: 直到wa