当前位置: 首页 > 面试题库 >

将大文件中的数据分块进行多处理?

松俊才
2023-03-14
问题内容

我正在尝试使用多重处理来并行化应用程序,该处理程序会处理一个非常大的csv文件(64MB至500MB),逐行执行一些工作,然后输出一个固定大小的小文件。

目前,我正在执行list(file_obj),不幸的是,它已完全加载到内存中(我认为),然后我将该列表分成了n个部分,n是我要运行的进程数。然后,我pool.map()在分类列表上执行。

与单线程,仅打开文件并迭代的方法相比,这似乎具有非常非常糟糕的运行时。有人可以提出更好的解决方案吗?

此外,我需要按组处理文件中的行,以保留特定列的值。这些行组本身可以拆分,但是该列中的任何组都不能包含多个值。


问题答案:

list(file_obj)大的时候可能需要很多内存fileobj。我们可以通过使用itertools根据需要拉出几行代码来减少内存需求。

特别是,我们可以使用

reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)

将文件拆分为可处理的块,以及

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)

使多处理池一次处理多个num_chunks块。

这样,我们大约只需要足够的内存即可在内存中保存几个(num_chunks)块,而不是整个文件。

import multiprocessing as mp
import itertools
import time
import csv

def worker(chunk):
    # `chunk` will be a list of CSV rows all with the same name column
    # replace this with your real computation
    # print(chunk)
    return len(chunk)

def keyfunc(row):
    # `row` is one row of the CSV file.
    # replace this with the name column.
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'test.dat'
    num_chunks = 10
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
    print(results)

if __name__ == '__main__':
    main()


 类似资料:
  • 问题内容: 如何将大数据文件分块写入CSV文件? 我有一组大型数据文件(1M行x 20列)。但是,我只关注该数据的5列左右。 我想通过只用感兴趣的列制作这些文件的副本来使事情变得更容易,所以我可以使用较小的文件进行后期处理。因此,我计划将文件读取到数据帧中,然后写入csv文件。 我一直在研究将大数据文件以块的形式读入数据框。但是,我还无法找到有关如何将数据分块写入csv文件的任何信息。 这是我现在

  • 我有两个大约150 MB的大txt文件。我想从file1的每一行读取一些数据,并扫描file2的所有行,直到找到匹配的数据。如果没有找到匹配的数据,我希望将该行输出到另一个文件中。 选项2:使用上面提到的三个读取器对文件1中的每个记录读取文件2 n次。每次读取后,我必须关闭文件并再次读取。我在想最好的办法是什么。我还有别的选择吗

  • 问题内容: 我用来并行化一些繁重的计算。 目标函数返回大量数据(庞大的列表)。我的RAM用完了。 如果不使用,我只需将生成的元素依次计算出来,就将目标函数更改为生成器。 我了解多处理不支持生成器- 它等待整个输出并立即返回,对吗?没有屈服。有没有一种方法可以使工作人员在数据可用时立即产生数据,而无需在RAM中构造整个结果数组? 简单的例子: 这是Python 2.7。 问题答案: 这听起来像是队列

  • 问题 你想将一个模块分割成多个文件。但是你不想将分离的文件统一成一个逻辑模块时使已有的代码遭到破坏。 解决方案 程序模块可以通过变成包来分割成多个独立的文件。考虑下下面简单的模块: # mymodule.py class A: def spam(self): print('A.spam') class B(A): def bar(self): pr

  • 问题内容: 我已使用从IMDB收集信息并将其传输到MYSQL数据库的应用程序导入了一些数据。 似乎这些字段尚未标准化,并且在1个字段中包含许多值 例如: 有没有办法将这些值分开,然后将它们插入到另一个表中,而不重复呢? 我进行了一些谷歌搜索,发现我应该使用PHP处理此数据。但是我一点都不了解PHP。 无论如何,仅使用MYSQL即可转换此​​数据? 问题答案: 您可以使用存储过程,该过程使用游标来解

  • 问题内容: 我目前正在尝试将以下大的制表符分隔的文件导入Python中类似数据框的结构中-自然,我正在使用数据框,尽管我愿意接受其他选择。 该文件大小为几GB,不是标准文件,它已损坏,即行的列数不同。一排可能有25列,另一排可能有21列。 这是数据示例: 如您所见,其中某些列的顺序不正确… 现在,我认为将文件导入数据框的正确方法是对数据进行预处理,以便可以输出带有值的数据框,例如 更复杂的是,这是