当前位置: 首页 > 面试题库 >

了解多处理:Python中的共享内存管理,锁和队列

韩刚洁
2023-03-14
问题内容

多重处理是python中的强大工具,我想更深入地了解它。我想知道何时使用 常规的
锁和队列,何时使用多处理管理器在所有进程之间共享它们。

我提出了以下测试方案,其中包含四种不同的条件进行多处理:

  1. 使用池和 NO Manager

  2. 使用池和管理器

  3. 使用单个流程和 NO Manager

  4. 使用单个流程和一个经理

工作

所有条件都执行作业功能the_jobthe_job包括一些通过锁固定的打印件。此外,该函数的输入只是放入队列中(以查看是否可以从队列中恢复它)。该输入是一个简单的索引idxrange(10)在称为主脚本创建start_scenario(在底部示出)。

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)

条件的成功定义为从队列中完全调用输入,请参见read_queue底部的功能。

条件

条件1和2是不言自明的。条件1涉及创建锁和队列,并将它们传递给进程池:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(辅助功能make_iterator在本文的底部给出。)条件1失败,显示为RuntimeError: Lock objects should only be shared between processes through inheritance

条件2相当相似,但是现在锁定和队列在管理者的监督下:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

在条件3中,手动启动新进程,并且在没有管理器的情况下创建锁和队列:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

条件4相似,但再次使用管理器:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

在3和4两种情况下,我针对10个任务中的每一个都启动一个新进程the_job,最多 同时 运行 ncores个
进程。这是通过以下辅助功能实现的:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)

结果

仅条件1失败(RuntimeError: Lock objects should only be shared between processes through inheritance),而其他3个条件成功。我尽力解决这个问题。

为什么池需要在所有进程之间共享锁和队列,而条件3中的单个进程却不需要?

我知道的是,对于池条件(1和2),来自迭代器的所有数据都是通过酸洗传递的,而在单进程条件(3和4)中,来自迭代器的所有数据都是通过对主进程的继承来传递的(我是使用
Linux
)。我猜直到在子进程中更改内存之前,都会访问父进程使用的相同内存(写时复制)。但是,只要一说完lock.acquire(),就应该更改它,子进程确实使用放在内存中其他位置的不同锁,不是吗?一个孩子进程如何知道一个兄弟激活了一个不能通过管理员共享的锁?

最后,我的问题有点相关:条件3和4有多少不同。两者都有各自的流程,但在管理人员的用法上有所不同。两者都被视为 有效
代码吗?或者,如果实际上不需要经理,应该避免使用经理吗?

完整剧本

对于那些只想复制并粘贴所有内容以执行代码的人,下面是完整的脚本:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()

问题答案:

multiprocessing.Lock使用操作系统提供的信号量对象实现。在Linux上,子级只是通过继承了父级的信号量句柄os.fork。这不是信号量的副本。它实际上继承了父级具有的相同句柄,可以继承文件描述符的相同方式。另一方面,Windows不支持os.fork,因此必须使Windows处于“腌制”状态Lock。它multiprocessing.Lock使用Windows
DuplicateHandleAPI通过创建对象内部使用的Windows信号灯的重复句柄来实现此目的,该API指出:

重复的句柄引用与原始句柄相同的对象。因此,对对象的任何更改都会通过两个手柄反映出来

DuplicateHandleAPI允许您将重复句柄的所有权交给子进程,以便子进程在取消选择之后实际上可以使用它。通过创建由孩子拥有的重复句柄,您可以有效地“共享”锁对象。

这是中的信号量对象 multiprocessing/synchronize.py

class SemLock(object):

    def __init__(self, kind, value, maxvalue):
        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
        debug('created semlock with handle %s' % sl.handle)
        self._make_methods()

        if sys.platform != 'win32':
            def _after_fork(obj):
                obj._semlock._after_fork()
            register_after_fork(self, _after_fork)

    def _make_methods(self):
        self.acquire = self._semlock.acquire
        self.release = self._semlock.release
        self.__enter__ = self._semlock.__enter__
        self.__exit__ = self._semlock.__exit__

    def __getstate__(self):  # This is called when you try to pickle the `Lock`.
        assert_spawning(self)
        sl = self._semlock
        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)

    def __setstate__(self, state): # This is called when unpickling a `Lock`
        self._semlock = _multiprocessing.SemLock._rebuild(*state)
        debug('recreated blocker with handle %r' % state[0])
        self._make_methods()

请注意assert_spawningin中的调用__getstate__,该调用在腌制对象时被调用。这是如何实现的:

#
# Check that the current thread is spawning a child process
#

def assert_spawning(self):
    if not Popen.thread_is_spawning():
        raise RuntimeError(
            '%s objects should only be shared between processes'
            ' through inheritance' % type(self).__name__
            )

该函数可以确保您Lock通过调用来“继承” the函数thread_is_spawning。在Linux上,该方法仅返回False

@staticmethod
def thread_is_spawning():
    return False

这是因为Linux不需要腌制即可继承Lock,因此,如果__getstate__实际上是在Linux上调用它,则我们一定不能继承。在Windows上,还有更多操作:

def dump(obj, file, protocol=None):
    ForkingPickler(file, protocol).dump(obj)

class Popen(object):
    '''
    Start a subprocess to run the code of a process object
    '''
    _tls = thread._local()

    def __init__(self, process_obj):
        ...
        # send information to child
        prep_data = get_preparation_data(process_obj._name)
        to_child = os.fdopen(wfd, 'wb')
        Popen._tls.process_handle = int(hp)
        try:
            dump(prep_data, to_child, HIGHEST_PROTOCOL)
            dump(process_obj, to_child, HIGHEST_PROTOCOL)
        finally:
            del Popen._tls.process_handle
            to_child.close()


    @staticmethod
    def thread_is_spawning():
        return getattr(Popen._tls, 'process_handle', None) is not None

在这里,thread_is_spawning返回True如果Popen._tls对象具有process_handle的属性。我们可以看到process_handle在中创建了属性__init__,然后使用将要继承的数据从父级传递给子级dump,然后删除了该属性。所以thread_is_spawning只会在True期间__init__。根据这个python-
ideas邮件列表线程,实际上这是一个人为限制,它被添加来模拟与os.forkLinux上相同的行为。Windows实际上 可以
支持Lock随时通过传递,因为DuplicateHandle可以随时运行。

以上所有内容均适用于该Queue对象,因为它在Lock内部使用。

我想说,继承Lock对象比使用a更可取Manager.Lock(),因为当使用a时Manager.LockLock必须通过IPC将每个对的调用都通过IPC发送到Manager进程,这比使用Lock调用内部的共享要慢得多。处理。不过,这两种方法都是完全有效的。

最后,可以使用/关键字参数将a传递给aLock的所有成员,Pool而无需使用a :Manager``initializer``initargs

lock = None
def initialize_lock(l):
   global lock
   lock = l

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    """
    lock = mp.Lock()
    mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
    queue = mp.Queue()

    iterator = make_iterator(args, queue)

    mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.

    mypool.close()
    mypool.join()

return read_queue(queue)

之所以可行,是因为传递的参数传递给在中运行的对象initargs__init__方法,因此它们最终被继承而不是被腌制。Process``Pool



 类似资料:
  • 问题内容: 我有三个大名单。前一个包含位数组(模块位数组0.8.0),另外两个包含整数数组。 这些数据结构占用相当多的RAM(总计约16GB)。 如果我使用以下方法启动12个子流程: 这是否意味着将为每个子流程复制l1,l2和l3,或者子流程将共享这些列表?或者更直接地说,我将使用16GB还是192GB的RAM? someFunction将从这些列表中读取一些值,然后根据读取的值执行一些计算。结果

  • 问题内容: 我在多处理模块上遇到了麻烦。我正在使用具有其map方法的工作人员池从大量文件中加载数据,对于每个文件,我都使用自定义函数来分析数据。每次处理文件时,我都希望更新一个计数器,以便可以跟踪还有多少文件需要处理。这是示例代码: 我找不到解决方案。 问题答案: 问题在于该变量未在您的进程之间共享:每个单独的进程都在创建它自己的本地实例并对其进行递增。 有关可用于在进程之间共享状态的某些技术,请

  • 问题内容: 第一个问题是Value和Manager()。Value有什么区别? 其次,是否可以不使用Value共享整数变量?下面是我的示例代码。我想要的是获取一个整数值而不是Value的字典。我所做的就是在此过程之后全部更改。有没有更简单的方法? 问题答案: 使用时,您会在共享内存中获得一个对象,默认情况下,该对象使用进行同步。使用该对象时,您将得到一个控制服务器进程的对象,该服务器进程允许对象值

  • 问题内容: Java程序员知道JVM运行垃圾回收器,而System.gc()只是建议JVM运行垃圾回收器。如果我们使用System.gc(),并不一定会立即运行GC。 如果我误解了Java的垃圾收集器,请纠正我。 除了依赖Java的Garbage Collector之外,还有其他方法可以进行内存管理吗? 如果您打算通过某种有助于管理内存的编程实践来回答问题,请这样做。 问题答案: 关于Java内存

  • 问题内容: 我正在尝试在Python中的多处理库中使用队列。执行下面的代码后(打印语句起作用),但是在调用Queue上的join之后,这些进程没有退出,并且仍然存在。我如何终止其余过程? 谢谢! 问题答案: 尝试这个:

  • 问题内容: 我想在共享内存中使用一个numpy数组,以便与多处理模块一起使用。困难是像numpy数组一样使用它,而不仅仅是ctypes数组。 这将产生如下输出: 可以ctypes方式访问该数组,例如arr[i]说得通。但是,它不是一个numpy数组,因此我无法执行,或。我想一个解决方案是将数组转换为数组。但是(除了无法完成这项工作外),我不相信会再共享它。 对于必须解决的常见问题,似乎将有一个标准