当前位置: 首页 > 面试题库 >

上下文管理器和多处理池

牧宁
2023-03-14
问题内容

假设您正在使用一个multiprocessing.Pool对象,并且正在使用initializer构造函数的设置来传递一个初始化函数,然后该初始化函数将在全局命名空间中创建资源。假设资源具有上下文管理器。如果上下文管理的资源必须在流程的整个生命周期中都可以使用,但是在最后要进行适当的清理,您将如何处理它的生命周期呢?

到目前为止,我有点像这样:

resource_cm = None
resource = None


def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

从这里开始,池进程可以使用资源。到现在为止还挺好。但是,由于multiprocessing.Pool类不提供destructoror或deinitializer参数,因此处理清理工作有些棘手。

我的想法之一是使用atexit模块,然后在初始化程序中注册清除程序。像这样:

def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

    def _clean_up():
        resource_cm.__exit__()

    import atexit
    atexit.register(_clean_up)

这是一个好方法吗?有没有更简单的方法可以做到这一点?

编辑:atexit似乎不起作用。至少在上面我没有用它的方式,所以到目前为止,我仍然没有解决这个问题的方法。


问题答案:

首先,这是一个非常好的问题!在深入研究multiprocessing代码后,我想我已经找到了一种方法:

当您启动时multiprocessing.Pool,该Pool对象会在内部multiprocessing.Process为池中的每个成员创建一个对象。当这些子流程启动时,它们会调用一个_bootstrap函数,如下所示:

def _bootstrap(self):
    from . import util
    global _current_process
    try:
        # ... (stuff we don't care about)
        util._finalizer_registry.clear()
        util._run_after_forkers()
        util.info('child process calling self.run()')
        try:
            self.run()
            exitcode = 0 
        finally:
            util._exit_function()
        # ... (more stuff we don't care about)

run方法是实际运行给target您的Process对象的方法。对于一个Pool进程,它是一个具有长时间运行的while循环的方法,该循环等待工作项通过内部队列进入。对我们真正有趣的是:被调用
之后 发生了什么。self.run``util._exit_function()

事实证明,该函数进行了一些清理,听起来很像您要寻找的内容:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                   active_children=active_children,
                   current_process=current_process):
    # NB: we hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')  # Very interesting!
    _run_finalizers(0)

这是的文档字符串_run_finalizers

def _run_finalizers(minpriority=None):
    '''
    Run all finalizers whose exit priority is not None and at least minpriority

    Finalizers with highest priority are called first; finalizers with
    the same priority will be called in reverse order of creation.
    '''

该方法实际上遍历终结器回调的列表并执行它们:

items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)

for key, finalizer in items:
    sub_debug('calling %s', finalizer)
    try:
        finalizer()
    except Exception:
        import traceback
        traceback.print_exc()

完善。那么我们如何进入_finalizer_registry?有一个未记录的对象Finalizemultiprocessing.util该对象负责向注册表添加回调:

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        assert exitpriority is None or type(exitpriority) is int

        if obj is not None:
            self._weakref = weakref.ref(obj, self)
        else:
            assert exitpriority is not None

        self._callback = callback
        self._args = args
        self._kwargs = kwargs or {}
        self._key = (exitpriority, _finalizer_counter.next())
        self._pid = os.getpid()

        _finalizer_registry[self._key] = self  # That's what we're looking for!

好的,因此将它们放到一个示例中:

import multiprocessing
from multiprocessing.util import Finalize

resource_cm = None
resource = None

class Resource(object):
    def __init__(self, args):
        self.args = args

    def __enter__(self):
        print("in __enter__ of %s" % multiprocessing.current_process())
        return self

    def __exit__(self, *args, **kwargs):
        print("in __exit__ of %s" % multiprocessing.current_process())

def open_resource(args):
    return Resource(args)

def _worker_init(args):
    global resource
    print("calling init")
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()
    # Register a finalizer
    Finalize(resource, resource.__exit__, exitpriority=16)

def hi(*args):
    print("we're in the worker")

if __name__ == "__main__":
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
    pool.map(hi, range(pool._processes))
    pool.close()
    pool.join()

输出:

calling init
in __enter__ of <Process(PoolWorker-1, started daemon)>
calling init
calling init
in __enter__ of <Process(PoolWorker-2, started daemon)>
in __enter__ of <Process(PoolWorker-3, started daemon)>
calling init
in __enter__ of <Process(PoolWorker-4, started daemon)>
we're in the worker
we're in the worker
we're in the worker
we're in the worker
in __exit__ of <Process(PoolWorker-1, started daemon)>
in __exit__ of <Process(PoolWorker-2, started daemon)>
in __exit__ of <Process(PoolWorker-3, started daemon)>
in __exit__ of <Process(PoolWorker-4, started daemon)>

如您所见__exit__,当我们共享资源时,我们所有的工作人员都会被调用join()



 类似资料:
  • 本文向大家介绍Python多个上下文管理器,包括了Python多个上下文管理器的使用技巧和注意事项,需要的朋友参考一下 示例 您可以同时打开多个内容管理器: 它与嵌套上下文管理器具有相同的效果:            

  • 要使人晓得智慧和训诲,分辨通达的言语。使人处事,领受智慧、仁义、公平、正直的训诲。使愚人灵明、使少年人有知识和谋略。使智慧人听见、增长学问、使聪明人得着智谋、使人明白箴言和譬喻、懂得智慧人的言词和谜语。敬畏耶和华使知识的开端,愚妄人藐视智慧和训诲。 上下文管理器 在《文件(1)》中提到,如果要打开文件,一种比较好的方法是使用with语句,因为这种方法,不仅结构简单,更重要的是不用再单独去判断某种异

  • Context managers for use with the with statement. 注解 When using Python 2.5, you will need to start your fabfile with from __future__ import with_statement in order to make use of the with statement (w

  • 问题内容: 我不知道为什么,但是每当我尝试传递到共享对象共享自定义类对象的方法时,都会收到此奇怪的错误。python版本:3.6.3 码: 错误: 这是什么问题 问题答案: 在这里找到了临时解决方案。我已经设法通过在 multiprocessing \ managers.py中 的AutoProxy的初始化程序中添加必要的关键字来解决此问题,但是我不知道此kwarg是否负责任何事情。

  • 问题内容: 我有一个异步API,用于连接邮件并将其发送到SMTP服务器,该服务器具有一些设置并已删除。因此,它非常适合使用Python 3的。 虽然,我不知道是否可以写,因为它们都使用生成器语法来写。 这可能证明了这个问题(包含yield-base和async-await语法的混合,以演示异步调用和上下文管理器的yield之间的区别)。 目前在python中可能发生这种情况吗?以及如何使用该 语句

  • 本文向大家介绍Python上下文管理器类和上下文管理器装饰器contextmanager用法实例分析,包括了Python上下文管理器类和上下文管理器装饰器contextmanager用法实例分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python上下文管理器类和上下文管理器装饰器contextmanager用法。分享给大家供大家参考,具体如下: 一. 什么是上下文管理器 上下文管理