我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题:
如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示:
(
p | 'Do one thing' >> beam.Create(List1)
)
(
p | 'Do second thing' >> beam.Create(List2)
)
result = p.run()
result.wait_until_finish()
我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?
您是正确的:图的这两个部分将在相同的数据流作业下运行。
数据流动态地并行工作项的执行。这意味着根据您的管道、它的工作量等,Dataflow可能决定在同一员工或不同员工上调度。
通常,如果在两个并行管道上有足够的工作(每个大约20个),那么是的:它们将并行运行。
我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以
我正在开发一个物联网应用程序,需要从PubSub主题读取流数据。我想使用Google云数据流SDK读取这些数据。我正在使用Java 1.8 我正在使用谷歌云平台的试用版。当我使用PubSubIO时。Read方法读取流数据时,我在日志文件中发现错误,我的项目没有足够的CPU配额来运行应用程序。 所以我想使用谷歌云数据流SDK读取流数据。 请有人告诉我在哪里可以找到使用Google Cloud Dat
当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?
有人能帮我做这个吗?
目前,我们正在库伯内特斯上使用自己安装的气流版本,但想法是在云作曲家上迁移。我们使用Airflow运行数据流作业,使用DataFlowJavaoperator的自定义版本(使用插件),因为我们需要执行java应用程序,而java应用程序不是在jar中自包含的。因此,我们基本上运行一个bash脚本,该脚本使用以下命令: 所有jar依赖项都存储在所有辅助角色之间的共享磁盘中,但是在Composer中缺