我正在使用Python的multiprocessing
模块并行处理大型numpy数组。阵列numpy.load(mmap_mode='r')
在主进程中使用内存映射。之后,multiprocessing.Pool()
分叉该过程(我想)。
一切似乎都正常,除了我得到类似以下内容的行:
AttributeError("'NoneType' object has no attribute 'tell'",)
in `<bound method memmap.__del__ of
memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)>`
ignored
在单元测试日志中。尽管如此,测试仍然可以通过。
知道那里发生了什么吗?
使用Python 2.7.2,OS X,NumPy 1.6.1。
更新:
经过一些调试后,我将原因找出到使用此内存映射的numpy数组(的一小部分)作为Pool.imap
调用输入的代码路径。
显然,“问题”是multiprocessing.Pool.imap
将其输入传递给新流程的方式:它使用泡菜。这不适用于mmap
ed
numpy数组,并且内部的中断会导致错误。
我发现罗伯特·科恩(Robert Kern)的答复似乎解决了同一问题。他建议为imap
输入何时来自内存映射的数组创建一个特殊的代码路径:在生成的过程中手动对同一数组进行内存映射。
这将是如此复杂和丑陋,以至于我宁愿忍受该错误和额外的内存副本。还有其他方法可以更轻松地修改现有代码吗?
我通常的方法(如果可以使用额外的内存副本)是在一个进程中完成所有IO,然后将其发送到工作线程池中。要将内存映射数组的切片加载到内存中,只需这样做x = np.array(data[yourslice])
(data[yourslice].copy()
实际上并没有这样做,这可能会引起一些混乱。)。
首先,让我们生成一些测试数据:
import numpy as np
np.random.random(10000).tofile('data.dat')
您可以使用以下方式重现错误:
import numpy as np
import multiprocessing
def main():
data = np.memmap('data.dat', dtype=np.float, mode='r')
pool = multiprocessing.Pool()
results = pool.imap(calculation, chunks(data))
results = np.fromiter(results, dtype=np.float)
def chunks(data, chunksize=100):
"""Overly-simple chunker..."""
intervals = range(0, data.size, chunksize) + [None]
for start, stop in zip(intervals[:-1], intervals[1:]):
yield data[start:stop]
def calculation(chunk):
"""Dummy calculation."""
return chunk.mean() - chunk.std()
if __name__ == '__main__':
main()
而且,如果只是改用让步np.array(data[start:stop])
,则可以解决问题:
import numpy as np
import multiprocessing
def main():
data = np.memmap('data.dat', dtype=np.float, mode='r')
pool = multiprocessing.Pool()
results = pool.imap(calculation, chunks(data))
results = np.fromiter(results, dtype=np.float)
def chunks(data, chunksize=100):
"""Overly-simple chunker..."""
intervals = range(0, data.size, chunksize) + [None]
for start, stop in zip(intervals[:-1], intervals[1:]):
yield np.array(data[start:stop])
def calculation(chunk):
"""Dummy calculation."""
return chunk.mean() - chunk.std()
if __name__ == '__main__':
main()
当然,这确实为每个块制作了一个额外的内存副本。
从长远来看,您可能会发现从映射文件切换到HDF之类的操作会更容易。如果您的数据是多维的,则尤其如此。(我建议h5py
,但是pyTables
如果您的数据是“类似表格的”,那就很好了。)
祝你好运,无论如何!
问题内容: 我不知道为什么,但是每当我尝试传递到共享对象共享自定义类对象的方法时,都会收到此奇怪的错误。python版本:3.6.3 码: 错误: 这是什么问题 问题答案: 在这里找到了临时解决方案。我已经设法通过在 multiprocessing \ managers.py中 的AutoProxy的初始化程序中添加必要的关键字来解决此问题,但是我不知道此kwarg是否负责任何事情。
问题内容: 假设您正在使用一个对象,并且正在使用构造函数的设置来传递一个初始化函数,然后该初始化函数将在全局命名空间中创建资源。假设资源具有上下文管理器。如果上下文管理的资源必须在流程的整个生命周期中都可以使用,但是在最后要进行适当的清理,您将如何处理它的生命周期呢? 到目前为止,我有点像这样: 从这里开始,池进程可以使用资源。到现在为止还挺好。但是,由于类不提供or或参数,因此处理清理工作有些棘
问题内容: 我用python与selenium结合编写了一个脚本,以从其着陆页中抓取不同文章的链接,并通过跟踪引向其内页的url最终获得每个文章的标题。尽管我在这里解析的内容是静态内容,但我还是使用了selenium来查看它在多处理中的工作方式。 但是,我的意图是使用多处理进行抓取。到目前为止,我知道selenium不支持多处理,但似乎我错了。 我的问题:当使用多处理运行selenium时,如何减
问题内容: 我正在使用Keras与Tensorflow作为后端。 我正在尝试在主流程中保存模型,然后在另一个流程中加载/运行(即调用)。 我目前正在尝试从文档中使用天真的方法来保存/加载模型:https : //keras.io/getting-started/faq/#how-can-i-save-a- keras-model 。 所以基本上: 在主要过程中 在子进程中 在子进程中 但是,它只是
问题内容: 有什么方法可以使pip多个版本的Python正常运行吗?例如,我想用于pip将内容显式安装到我的站点2.5安装或站点2.6安装中。 例如,使用,我使用。 而且,是的-我了解,不是-这不是解决此特定问题的方法。 问题答案: 在目前的建议是使用,这里python是Python的版本,你想使用。这是建议,因为它适用于所有版本的和所有形式的。例如: 先前的答案,留给后代: 从0.8版开始,支持
我有一个图像路径列表,我想在进程或线程之间划分,以便每个进程处理列表的某些部分。处理包括从磁盘加载图像,进行一些计算并返回结果。我正在使用Python 2.7 下面是我如何创建辅助进程 我所面临的问题是,当我在initializer函数中记录初始化时间时,我知道worker不是并行初始化的,而是每个worker都以5秒的间隔初始化,下面是供参考的日志 我尝试过使用将同时启动辅助线程 我知道Wind