当前位置: 首页 > 知识库问答 >
问题:

Apache Beam数据流读取带有splittable=true的大CSV,导致重复条目

顾淳
2023-03-14

我使用下面的代码片段将CSV文件作为dict读入管道。

class MyCsvFileSource(beam.io.filebasedsource.FileBasedSource):
    def read_records(self, file_name, range_tracker):
        self._file = self.open_file(file_name)

        reader = csv.DictReader(self._file, dialect=MyCustomDialect)

        for rec in reader:
            yield rec
splittable (bool): whether :class:`FileBasedSource` should try to
logically split a single file into data ranges so that different parts
of the same file can be read in parallel. If set to :data:`False`,
:class:`FileBasedSource` will prevent both initial and dynamic splitting
of sources for single files. File patterns that represent multiple files
may still get split into sources for individual files. Even if set to
:data:`True` by the user, :class:`FileBasedSource` may choose to not
split the file, for example, for compressed files where currently it is
not possible to efficiently read a data range without decompressing the
whole file.

在并行读取源文件时可能会有一些问题吗?是不是有什么我忽略了或者没有用正确的方式去照顾?

共有1个答案

谢翰学
2023-03-14

为了支持没有重复的拆分,您必须在从源代码读取时使用传递的'range_tracker'对象。例如,在声明正在读取的文件的唯一位置时,必须调用try_claim()。

更多信息请见下文。https://beam.apache.org/documentation/sdks/python-custom-io/

 类似资料:
  • pyspark新手,希望将csv文件读取到数据帧。似乎不能让人读。有什么帮助吗? ()中的Py4JJavaError回溯(最近一次调用)----

  • 问题内容: 我正在读取csv并与mysql检查记录是否存在于我的表中或不在php中。 csv大约有25000条记录,当我运行我的代码时,它在2m 10s后显示“服务不可用”错误(加载:2m 10s) 在这里我添加了代码 注意:我只想列出表中不存在的记录。 请为我建议解决方案… 问题答案: 首先,您应该了解,在使用file_get_contents时,您会将整个数据字符串提取到一个变量中,该变量存储

  • 我退出新的MongoDB。我有一个收藏,里面几乎没有文件。下面是一个例子。 我想获得数组中所有条目的标记以显示它们,并使用java进行一些计算。到目前为止,我已经完成了阅读文档和显示所有数据的工作。但我找不到一种方法,只能从中得到“标记”。 到目前为止,关于使用Java从MongoDB读取数据,我只知道这些。请帮助我将文档中的“标记”字段仅获取到数组中<谢谢你。

  • 我有个问题: 我想创建一个chatwebapp,并使用Bootstrap进行布局。不幸的是,在下面的代码中,我得到了一个不应该存在的水平滚动条。我使用列和行,我知道在引导css中有一个填充,但当我试图移除它时,实际上什么也没有发生,滚动条仍停留在那里。也许你们能帮我。 null null

  • 问题内容: 我有大型CSV,我只对这些行的子集感兴趣。特别是,我想读取在满足特定条件之前发生的所有行。 例如,如果将产生数据框: 有什么方法可以读取csv中的所有行,直到col B超过10。在上面的示例中,我想读入: 我知道在读入数据帧后如何将这些行扔掉,但是到现在为止,我已经花了所有的计算来读入它们。在读取csv之前,我无法访问最后一行的索引请不要跳过脚) 问题答案: 您可以分批读取csv。由于

  • 问题内容: 我正在尝试根据我已经拥有的csv检查提取数据的值。它只会循环遍历CSV的行一次,我只能检查feed.items()的一个值。我需要在某个地方重置值吗?有没有更好/更有效的方法来做到这一点?谢谢。 问题答案: 您可以通过重置文件对象的读取位置来“重置” CSV迭代器。