当前位置: 首页 > 面试题库 >

多处理-共享复杂对象

百里骏
2023-03-14
问题内容

我有一个类似大型dict的对象,需要在多个工作进程之间共享。每个工作人员读取对象中信息的随机子集,并对其进行一些计算。我想避免复制大对象,因为我的机器很快耗尽了内存。

我正在处理此SO问题的代码,并对其进行了一些修改以使用固定大小的进程池,该池更适合于我的用例。然而,这似乎打破了它。

from multiprocessing import Process, Pool
from multiprocessing.managers import BaseManager

class numeri(object):
    def __init__(self):
        self.nl = []

    def getLen(self):
        return len(self.nl)

    def stampa(self):
        print self.nl

    def appendi(self, x):
        self.nl.append(x)

    def svuota(self):
        for i in range(len(self.nl)):
            del self.nl[0]

class numManager(BaseManager):
    pass

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    return id(listaNumeri)

def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi',
                        'svuota', 'stampa'])
    mymanager = numManager()
    mymanager.start()
    listaNumeri = mymanager.numeri()
    print id(listaNumeri)

    print '------------ Process'
    for i in range(5):
        producer = Process(target=produce, args=(listaNumeri,))
        producer.start()
        producer.join()

    print '--------------- Pool'
    pool = Pool(processes=1)
    for i in range(5):
        pool.apply_async(produce, args=(listaNumeri,)).get()

if __name__ == '__main__':
    main()

输出是

4315705168
------------ Process
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
--------------- Pool
producing 4299771152
producing 4315861712
producing 4299771152
producing 4315861712
producing 4299771152

如您所见,在第一种情况下,所有工作进程都获得相同的对象(按ID)。在第二种情况下,id不相同。这是否意味着正在复制对象?

PS:我认为这并不重要,但是我正在使用joblib,它在内部使用了Pool

from joblib import delayed, Parallel

print '------------- Joblib'
        Parallel(n_jobs=4)(delayed(produce)(listaNumeri) for i in range(5))

输出:

------------- Joblib
producing 4315862096
producing 4315862288
producing 4315862480
producing 4315862672
producing 4315862352

问题答案:

恐怕这里实际上没有什么能像您 希望的那样 起作用:-(

首先请注意, 不同过程id()产生 相同值 不会
告诉您对象是否真的是同一对象。每个进程都有自己的虚拟地址空间,由操作系统分配。两个进程中的相同虚拟地址可以引用完全不同的 物理
内存位置。您的代码是否产生相同的id()输出几乎完全是偶然的。在多次运行中,有时我会id()在您的Process部分中看到不同的输出,并在您的部分中看到重复的id()输出,Pool反之亦然,或两者兼而有之。

其次,Manager提供 语义 共享而不提供 物理 共享。您numeri实例的数据 仅存
在于管理器进程中。您的所有辅助进程均查看(副本)代理对象。这些是精简包装,用于转发要由管理器进程执行的所有操作。这涉及许多进程间通信以及管理器进程内部的序列化。这是编写速度很慢的代码的好方法;-)是的,只有一个numeri数据副本,但是对数据的所有工作都由一个进程(管理器进程)完成。

为了更清楚地看到这一点,建议进行@martineau的更改,并更改get_list_id()为:

def get_list_id(self):  # added method
    import os
    print("get_list_id() running in process", os.getpid())
    return id(self.nl)

这是示例输出:

41543664
------------ Process
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 46268496
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 44153904
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
--------------- Pool
producing 41639248
get_list_id() running in process 5856
with list_id 44544608
producing 41777200
get_list_id() running in process 5856
with list_id 44544608
producing 41776816
get_list_id() running in process 5856
with list_id 44544608
producing 41777168
get_list_id() running in process 5856
with list_id 44544608
producing 41777136
get_list_id() running in process 5856
with list_id 44544608

明确?每次获得相同列表ID的原因 不是 因为每个工作进程 都具有 相同的self.nl成员,而是因为所有numeri方法
都在 单个进程(管理器进程)中运行。这就是列表ID始终相同的原因。

如果您在Linux-
y系统(支持的操作系统fork())上运行,那么更好的主意是,Manager在启动任何工作进程之前,请忘记所有这些东西,并在模块级别创建复杂的对象。然后,工作人员将继承您的复杂对象(地址空间副本)。通常的写时复制fork()语义将使内存效率尽可能高。如果不需要将突变折叠到复杂对象的主程序副本中,这就足够了。如果确实需要将变异折回,那么您又需要进行大量的进程间通信,并且multiprocessing相应地吸引力也就降低了。

这里没有简单的答案。不要开枪;-)



 类似资料:
  • 问题内容: 我有三个大名单。前一个包含位数组(模块位数组0.8.0),另外两个包含整数数组。 这些数据结构占用相当多的RAM(总计约16GB)。 如果我使用以下方法启动12个子流程: 这是否意味着将为每个子流程复制l1,l2和l3,或者子流程将共享这些列表?或者更直接地说,我将使用16GB还是192GB的RAM? someFunction将从这些列表中读取一些值,然后根据读取的值执行一些计算。结果

  • 问题内容: 我在多处理模块上遇到了麻烦。我正在使用具有其map方法的工作人员池从大量文件中加载数据,对于每个文件,我都使用自定义函数来分析数据。每次处理文件时,我都希望更新一个计数器,以便可以跟踪还有多少文件需要处理。这是示例代码: 我找不到解决方案。 问题答案: 问题在于该变量未在您的进程之间共享:每个单独的进程都在创建它自己的本地实例并对其进行递增。 有关可用于在进程之间共享状态的某些技术,请

  • 问题内容: 我正在Python中进行多处理实验,并试图在两个进程之间共享一个字符串数组。这是我的python代码: 运行脚本时,我看到该脚本已正确填充,并且可在中找到,但不能在中找到。结果如下: 我忽略了什么吗? 预先感谢您的反馈。:) 问题答案: 我的猜测是: 存储3个指针。将它们分配给当前进程之外没有意义的内存地址。尝试访问此时包含垃圾邮件的无意义地址。 分配对所有过程都有意义的值似乎有帮助:

  • 问题内容: 第一个问题是Value和Manager()。Value有什么区别? 其次,是否可以不使用Value共享整数变量?下面是我的示例代码。我想要的是获取一个整数值而不是Value的字典。我所做的就是在此过程之后全部更改。有没有更简单的方法? 问题答案: 使用时,您会在共享内存中获得一个对象,默认情况下,该对象使用进行同步。使用该对象时,您将得到一个控制服务器进程的对象,该服务器进程允许对象值

  • 问题内容: 请考虑以下代码: 这给出了输出 但是,我希望它能给 ..因为在调用触发方法时h.id已更改为“ B”。 似乎是在启动单独的进程时创建了主机实例的副本,因此原始主机中的更改不会影响该副本。 在我的项目中(当然,要更详细地说明),主机实例字段有时会更改,并且由在单独的进程中运行的代码触发的事件可以访问这些更改很重要。 问题答案: 多处理在不同的 流程中 运行东西。事物在发送时 不被 复制几

  • 我有一个离散事件流进入我的系统,我需要根据每个事件的内容应用规则。另外,我想对这些流事件应用复杂的事件处理。 约束1.这些规则是用户提供的,并将动态更改。2.每当应用规则时,我不想重新启动我的系统。3.HA 4.只有成熟的开源解决方案 可能的方式...1.在Storm螺栓内运行Esper CEP 2。让口水流到Storm螺栓里 > 这会处理单事件规则和复杂事件吗?规则更改是否需要我的Storm重新