当前位置: 首页 > 面试题库 >

numpy vs.多处理和mmap

薛淮晨
2023-03-14
问题内容

我正在使用Python的multiprocessing模块并行处理大型numpy数组。阵列numpy.load(mmap_mode='r')在主进程中使用内存映射。之后,multiprocessing.Pool()分叉该过程(我想)。

一切似乎都正常,除了我得到类似以下内容的行:

AttributeError("'NoneType' object has no attribute 'tell'",)
  in `<bound method memmap.__del__ of
       memmap([ 0.57735026,  0.57735026,  0.57735026,  0.        ,  0.        ,        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,        0.        ,  0.        ], dtype=float32)>`
     ignored

在单元测试日志中。尽管如此,测试仍然可以通过。

知道那里发生了什么吗?

使用Python 2.7.2,OS X,NumPy 1.6.1。

更新:

经过一些调试后,我将原因找出到使用此内存映射的numpy数组(的一小部分)作为Pool.imap调用输入的代码路径。

显然,“问题”是multiprocessing.Pool.imap将其输入传递给新流程的方式:它使用泡菜。这不适用于mmaped
numpy数组,并且内部的中断会导致错误。

我发现罗伯特·科恩(Robert Kern)的答复似乎解决了同一问题。他建议为imap输入何时来自内存映射的数组创建一个特殊的代码路径:在生成的过程中手动对同一数组进行内存映射。

这将是如此复杂和丑陋,以至于我宁愿忍受该错误和额外的内存副本。还有其他方法可以更轻松地修改现有代码吗?


问题答案:

我通常的方法(如果可以使用额外的内存副本)是在一个进程中完成所有IO,然后将其发送到工作线程池中。要将内存映射数组的切片加载到内存中,只需这样做x = np.array(data[yourslice])data[yourslice].copy()实际上并没有这样做,这可能会引起一些混乱。)。

首先,让我们生成一些测试数据:

import numpy as np
np.random.random(10000).tofile('data.dat')

您可以使用以下方式重现错误:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield data[start:stop]

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

而且,如果只是改用让步np.array(data[start:stop]),则可以解决问题:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield np.array(data[start:stop])

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

当然,这确实为每个块制作了一个额外的内存副本。

从长远来看,您可能会发现从映射文件切换到HDF之类的操作会更容易。如果您的数据是多维的,则尤其如此。(我建议h5py,但是pyTables如果您的数据是“类似表格的”,那就很好了。)

祝你好运,无论如何!



 类似资料:
  • 问题内容: 我不知道为什么,但是每当我尝试传递到共享对象共享自定义类对象的方法时,都会收到此奇怪的错误。python版本:3.6.3 码: 错误: 这是什么问题 问题答案: 在这里找到了临时解决方案。我已经设法通过在 multiprocessing \ managers.py中 的AutoProxy的初始化程序中添加必要的关键字来解决此问题,但是我不知道此kwarg是否负责任何事情。

  • 问题内容: 假设您正在使用一个对象,并且正在使用构造函数的设置来传递一个初始化函数,然后该初始化函数将在全局命名空间中创建资源。假设资源具有上下文管理器。如果上下文管理的资源必须在流程的整个生命周期中都可以使用,但是在最后要进行适当的清理,您将如何处理它的生命周期呢? 到目前为止,我有点像这样: 从这里开始,池进程可以使用资源。到现在为止还挺好。但是,由于类不提供or或参数,因此处理清理工作有些棘

  • 问题内容: 我用python与selenium结合编写了一个脚本,以从其着陆页中抓取不同文章的链接,并通过跟踪引向其内页的url最终获得每个文章的标题。尽管我在这里解析的内容是静态内容,但我还是使用了selenium来查看它在多处理中的工作方式。 但是,我的意图是使用多处理进行抓取。到目前为止,我知道selenium不支持多处理,但似乎我错了。 我的问题:当使用多处理运行selenium时,如何减

  • 问题内容: 我正在使用Keras与Tensorflow作为后端。 我正在尝试在主流程中保存模型,然后在另一个流程中加载/运行(即调用)。 我目前正在尝试从文档中使用天真的方法来保存/加载模型:https : //keras.io/getting-started/faq/#how-can-i-save-a- keras-model 。 所以基本上: 在主要过程中 在子进程中 在子进程中 但是,它只是

  • 问题内容: 有什么方法可以使pip多个版本的Python正常运行吗?例如,我想用于pip将内容显式安装到我的站点2.5安装或站点2.6安装中。 例如,使用,我使用。 而且,是的-我了解,不是-这不是解决此特定问题的方法。 问题答案: 在目前的建议是使用,这里python是Python的版本,你想使用。这是建议,因为它适用于所有版本的和所有形式的。例如: 先前的答案,留给后代: 从0.8版开始,支持

  • 我有一个图像路径列表,我想在进程或线程之间划分,以便每个进程处理列表的某些部分。处理包括从磁盘加载图像,进行一些计算并返回结果。我正在使用Python 2.7 下面是我如何创建辅助进程 我所面临的问题是,当我在initializer函数中记录初始化时间时,我知道worker不是并行初始化的,而是每个worker都以5秒的间隔初始化,下面是供参考的日志 我尝试过使用将同时启动辅助线程 我知道Wind