当前位置: 首页 > 知识库问答 >
问题:

来自单个Google云数据流作业的并行数据流流水线

子车凌龙
2023-03-14

我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题:

  • 一个数据流作业中的并行管道

如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示:

    (
            p | 'Do one thing' >> beam.Create(List1)
    )
    (
            p | 'Do second thing' >> beam.Create(List2)
    )       
result = p.run()
result.wait_until_finish()  

我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?

共有1个答案

南宫云
2023-03-14

您是正确的:图的这两个部分将在相同的数据流作业下运行。

数据流动态地并行工作项的执行。这意味着根据您的管道、它的工作量等,Dataflow可能决定在同一员工或不同员工上调度。

通常,如果在两个并行管道上有足够的工作(每个大约20个),那么是的:它们将并行运行。

 类似资料:
  • 我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以

  • 我正在开发一个物联网应用程序,需要从PubSub主题读取流数据。我想使用Google云数据流SDK读取这些数据。我正在使用Java 1.8 我正在使用谷歌云平台的试用版。当我使用PubSubIO时。Read方法读取流数据时,我在日志文件中发现错误,我的项目没有足够的CPU配额来运行应用程序。 所以我想使用谷歌云数据流SDK读取流数据。 请有人告诉我在哪里可以找到使用Google Cloud Dat

  • 有人能帮我做这个吗?

  • 当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?

  • 目前,我们正在库伯内特斯上使用自己安装的气流版本,但想法是在云作曲家上迁移。我们使用Airflow运行数据流作业,使用DataFlowJavaoperator的自定义版本(使用插件),因为我们需要执行java应用程序,而java应用程序不是在jar中自包含的。因此,我们基本上运行一个bash脚本,该脚本使用以下命令: 所有jar依赖项都存储在所有辅助角色之间的共享磁盘中,但是在Composer中缺