当前位置: 首页 > 知识库问答 >
问题:

云数据流/波束-PCollection查找另一个PCollection

戎亦
2023-03-14

a)从有界源读取,在数据流中运行时,PCollection的大小可以有多大?b)当处理大数据时,假设PCollection的大约5000万个数据试图查找另一个PCollection的大约1000万个数据。这能做到吗?beam/dataflow的性能有多好?在一个ParDo函数中,假设我们只能传递一个输入并返回一个输出,如何基于两个输入数据集执行查找?我试图查看Dataflow/beam,类似于任何其他ETL工具,在那里简单的查找可能会创建一个新的PCollection。请提供任何代码片段,这可能会有所帮助。

我也看到了侧输入功能,但是侧输入真的能容纳那个大数据集吗?

共有1个答案

相洛华
2023-03-14

您绝对可以使用侧输入来实现这一点,因为侧输入可能任意大。

在Java中,您可以执行如下操作:

Pipeline pipeline = Pipeline.create(options);
PCollectionView<Map<...>> lookupCollection = pipeline
   .apply(new ReadMyLookupCollection())
   .apply(View.asMap());


PCollection<..> mainCollection = pipeline
    .apply(new ReadMyPCollection())
    .apply(
        ParDo.of(new JoinPCollsDoFn()).withSideInputs(lookupCollection));

class JoinPCollsDoFn<...> extends DoFn<...> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    Map<...> siMap = c.sideInput(lookupCollection);
    String lookupKey = c.element().lookupKey;
    AugmentedElement result = c.element().mergeWith(siMap.get(lookupKey))
    c.output(result);
  }
}

FWIW,这有点伪代码,但它是您想要做的事情的一个片段。如果你想进一步澄清,请告诉我。

 类似资料:
  • 我们有一个波束/数据流管道(使用数据流SDK 2.0.0-beta3 但是,我们正在设置 参数,我们可以看到所有二进制文件/jar 等都已上传到我们在 参数中指定的存储桶。 但是,Beam/Dataflow 随后会在我们项目的 GCS 中创建以下僵尸存储桶: 为什么会发生这种情况,如果我们清楚地设置参数?

  • null 我注意到这太慢了--CPU资源只被利用了几%。我怀疑每个节点都得到了一个zip文件,但是工作不是在本地CPU之间分配的--所以每个节点只有一个CPU在工作。我不明白为什么会这样,因为我使用了平面地图。

  • 我们正在为Apache Beam管道构建一个集成测试,并遇到了一些问题。有关上下文,请参见下文... 有关我们管道的详细信息: null null 这是一个简单的集成测试,它将验证我们的管道作为一个整体的行为是预期的。 我们目前面临的问题是,当我们运行管道时,它正在阻塞。我们使用的是和(而不是),但是测试似乎在运行管道后挂起。因为这是一个无界的(以流模式运行),所以管道不会终止,因此没有到达管道之

  • 嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。

  • 我对GCP、Dataflow、Apache Beam、Python和一般的OOP都是新手。我来自函数式javascript领域,对于上下文。 现在,我已经用Apache Beam python sdk构建了一个流管道,并将其部署到GCP的数据流中。管道的源是pubsub订阅,接收器是数据存储。 管道从pubsub订阅中获取消息,根据配置对象+消息内容做出决定,然后根据做出的决定将其放在数据存储中的

  • 这与这个问题最为相似。 我正在Dataflow 2.x中创建一个管道,它从Pubsub队列获取流式输入。进入的每一条消息都需要通过来自Google BigQuery的一个非常大的数据集进行流式传输,并且在写入数据库之前附加了所有相关的值(基于一个键)。 问题是来自BigQuery的映射数据集非常大--任何将其用作侧输入的尝试都失败了,数据流运行程序会抛出错误“java.lang.IllegalAr