当前位置: 首页 > 知识库问答 >
问题:

Google Dataflow批处理文件性能差

仰翔
2023-03-14

我试图使用Apache Beam2.16.0构建一个流水线来处理大量的XML文件。平均每24小时的计数是7000万,在高峰负载时,它可以上升到5亿。文件大小从1 kb到200 kb不等(有时甚至更大,例如30 mb)

文件经过各种转换,最终目标是BigQuery表,以便进一步分析。因此,我首先读取xml文件,然后反序列化为POJO(在Jackson的帮助下),然后应用所有所需的转换。转换工作得非常快,在我的机器上,根据文件大小,我每秒可以得到大约40000个转换。

我主要关心的是文件读取速度。我感觉所有的阅读都是通过一个工人完成的,我不明白这怎么能并行不悖。我在10K测试文件数据集上进行了测试。

为什么这会在处理速度上产生如此大的差异?

运行参数:

--runner=DataflowRunner
--project=<...>
--inputFilePattern=gs://java/log_entry/*.xml
--workerMachineType=n1-standard-4
--tempLocation=gs://java/temp
--maxNumWorkers=100

运行区域和桶区域相同。

pipeline.apply(
  FileIO.match()
    .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
    .filepattern(options.getInputFilePattern())
    .continuously(Duration.standardSeconds(10), Watch.Growth.never()))
  .apply("xml to POJO", ParDo.of(new XmlToPojoDoFn()));
<LogEntry><EntryId>0</EntryId>
    <LogValue>Test</LogValue>
    <LogTime>12-12-2019</LogTime>
    <LogProperty>1</LogProperty>
    <LogProperty>2</LogProperty>
    <LogProperty>3</LogProperty>
    <LogProperty>4</LogProperty>
    <LogProperty>5</LogProperty>
</LogEntry>

如果我可以在我的机器上处理大约750个文件/秒(这是一个强大的worker),那么我希望在DataFlow中类似的10个worker上处理大约7500个文件/秒。

共有1个答案

太叔栋
2023-03-14

我试图创建一个带有某些功能的测试代码,以检查fileio.match的行为和工作人员的数量[1]。

在这段代码中,我将值numWorkers设置为50,但是您可以设置您需要的值。我可以看到的是,fileio.match方法将查找与这些模式匹配的所有链接,但在此之后,您必须分别处理每个文件的内容。

例如,在我的例子中,我创建了一个接收每个文件的方法,然后用“new_line(\n)”字符对内容进行分割(但是在这里您可以根据需要来处理它,这也取决于文件的类型,csv、xml、...)。

因此,我将每一行转换为tableRow,即BigQuery理解的格式,并分别返回每个值(out.output(tab)),这样,Dataflow将根据管道的工作负载在不同的worker中处理这些行,例如,在3个不同的worker中处理3000行,每个worker有1000行。

最后,由于这是一个批处理过程,Dataflow将等待处理所有行,然后将其插入到BigQuery中。

我希望这段测试代码对您的测试有所帮助。

 类似资料:
  • 主要内容:创建批处理文件,保存批处理文件,执行批处理文件,修改批处理文件在本章中,我们将学习如何创建,保存,执行和修改批处理文件。 创建批处理文件 批处理文件通常在记事本中创建。 因此,最简单的方法是打开记事本并输入脚本所需的命令。 对于这个练习,打开记事本并输入以下语句。 保存批处理文件 在创建批处理文件后,下一步是保存批处理文件。 批处理文件的扩展名为或。 命名批处理文件时需要注意的一些常规规则 - 在命名批处理文件时避免使用空格,有时会在从其他脚本中调用时产生问

  • 问题内容: 我需要将几亿条记录插入mysql db。我要一次插入一百万个。请在下面查看我的代码。它似乎很慢。有什么方法可以优化它吗? 问题答案: 我在mysql中遇到类似的性能问题,并通过在连接URL中设置useServerPrepStmts和rewriteBatchedStatements属性来解决。

  • 我们开发了一个Spring批处理应用程序,其中我们有两个流程。1.向前2.向后。我们只使用文件读/写,不涉及数据库。 > 正向场景:输入文件将包含22个字段的记录。通过执行序列号生成和添加一些填充字段等操作,将22个字段转换为32个字段。根据国家代码,输出将被分成最多3个。每个块将有250K条记录。(如果记录以百万为单位,则将为同一国家生成多个文件)。 800万张唱片需要36分钟。 800万记录将

  • 使用图像处理器转换文件 图像处理器可以转换和处理多个文件。与“批处理”命令不同,您不必先创建动作,就可以使用图像处理器来处理文件。您可以在图像处理器中执行下列任何操作: 将一组文件转换为 JPEG、PSD 或 TIFF 格式之一,或者将文件同时转换为所有三种格式。 使用相同选项来处理一组相机原始数据文件。 调整图像大小,使其适应指定的像素大小。 嵌入颜色配置文件或将一组文件转换为 sRGB,然后将

  • 本文向大家介绍javascript日期处理函数,性能优化批处理,包括了javascript日期处理函数,性能优化批处理的使用技巧和注意事项,需要的朋友参考一下 其实网上写javascript日期格式化的博文很多,大体都看了看,都还不错。唯一遗憾的是只顾着实现了功能,没对函数进行性能优化。 俗话说:不要重复造轮子。google上找了一个比较不错的日期格式化函数,来开始我的优化之旅吧! google上

  • 我们有一个spring批处理作业,我们试图处理大约1000万条记录。现在,在单线程中执行此操作将非常慢,因为我们必须匹配SLA。 为了提高性能,我们开发了一个POC,其中主步骤是创建分区,每个分区代表一个唯一的prod ID。这可以从500到4500之间的任何地方。在POC中,我们有500个这样唯一的产品ID。现在,每个分区都有一个prod id和一个步骤。所有这一切都很顺利。 我们注意到的是主步