当前位置: 首页 > 知识库问答 >
问题:

Beam/数据流中的批处理PCollection

池阳伯
2023-03-14

我在GCP数据流/Apache Beam中有一个PCollection。我需要将“按N”组合起来,而不是逐个处理它。类似于分组(N)。因此,在有界处理的情况下,它将按10个项目进行分组,最后一批是剩下的任何项目。这在Apache Beam中可能吗?

共有1个答案

百里骏
2023-03-14

编辑,类似于:谷歌数据流“elementCountExact”聚合

您应该能够通过将元素分配给全局窗口并使用AfterPane来执行类似的操作。元素计数至少(N)。您仍然需要考虑如果没有足够的元素触发触发器会发生什么。您可以使用此选项:

 Repeatedly.forever(AfterFirst.of(
  AfterPane.elementCountAtLeast(N),
  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(X))))

但是你应该问自己为什么首先需要这种启发式方法,可能有更多的偶像化方法来解决你的问题。在Beam的编程指南中阅读有关Data-Driven触发器

 类似资料:
  • 我目前正在使用Python API开发一个更大的Apache Beam管道,它从BigQuery中读取数据,并最终将其写回另一个BigQuery任务。 其中一个转换需要使用二进制程序来转换数据,为此,它需要加载一个23GB的二进制查找数据文件。因此,启动和运行该程序需要大量的开销(每次加载/运行大约需要2分钟)和RAM,并且仅为一条记录启动该程序是没有意义的。此外,每次都需要将23GB文件从云存储

  • 顺便说一句:我的应用程序是一些REST控制器和一些批处理作业的组合。那么使用云数据流有意义吗?如果没有,那么是否有更好的控制台管理器用于批处理作业(如重新启动、取消作业门户)等?

  • 我有一个Pub/Sub主题,它会定期(通常每隔几天或几周一次,但有时更频繁)接收批量消息。我想启动一个批处理数据流作业来读取这些消息,执行一些转换,将结果写入Datastore,然后停止运行。当新一批消息发出时,我想启动一项新工作。我已经阅读了Apache Beam PythonSDK文档和许多SO问题,但仍不确定一些事情。 Pub/Sub IO可以作为非流作业的一部分读取吗?然后同一作业可以使用

  • 我需要访问两个数据源: Spring批处理存储库:在内存H2中 我的步骤需要访问。 我在那里看到了几个关于如何创建自定义

  • 我的数据库中有大约1000万个blob格式的文件,我需要转换并以pdf格式保存它们。每个文件大小约为0.5-10mb,组合文件大小约为20 TB。我正在尝试使用spring批处理实现该功能。然而,我的问题是,当我运行批处理时,服务器内存是否可以容纳那么多的数据?我正在尝试使用基于块的处理和线程池任务执行器。请建议运行作业的最佳方法是否可以在更短的时间内处理如此多的数据

  • 如何使用带有DataflowRunner的apache光束从Google BigQuery数据集获取表列表? 我找不到如何从指定的数据集中获取表。我想使用数据流的并行处理编程模型将表从位于美国的数据集迁移到位于欧盟的数据集。