在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。
如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?
如果您要提交到数据流,我相信这可能有效:
DataflowPipelineJob result = (DataflowPipelineJob)pipeline.run()
result.getJobId()
但是您无法在管道本身的 afaik(DoFns 等)中访问它。
确保您知道自己的工作ID/名称的最佳方法是自己设置。您可以通过设置 --jobName
来执行此操作,这可以通过选项.getJobName()
访问,数据流将使用它。请注意,它必须是唯一的。
当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?
我已经开始使用Scala SDK Scio开发我的第一个DataFlow工作。数据流作业将以流模式运行。 有谁能建议最好的部署方法吗?我已经在Scio文档中阅读了他们使用的,然后将其部署到Docker容器中。我也读过关于使用数据流模板的内容(但不是很详细)。 什么是最好的?
当我通过命令行运行Beam管道时,使用direct runner或dataflow runner,它工作得很好。。。 例子: 但是当我尝试使用空气流时,我有两个选项,bash操作符或python操作符。 使用bash操作符会成功,但会限制我使用气流功能的能力。 但是我想做的是作为python操作员运行它。所以我将模块导入到airflow dg文件中,然后作为python操作符运行它。 如果我使用本
我用下面的命令创建了一个数据流模板 数据流不为table_view2调用此函数,而是为该作业使用table_view。
目前,我们正在库伯内特斯上使用自己安装的气流版本,但想法是在云作曲家上迁移。我们使用Airflow运行数据流作业,使用DataFlowJavaoperator的自定义版本(使用插件),因为我们需要执行java应用程序,而java应用程序不是在jar中自包含的。因此,我们基本上运行一个bash脚本,该脚本使用以下命令: 所有jar依赖项都存储在所有辅助角色之间的共享磁盘中,但是在Composer中缺
如何从胶水作业中检索胶水工作流参数? 我有一个“Python Shell”类型的AWS胶水作业,它从胶水工作流中定期触发。 该作业的代码将在大量不同的工作流中重用,因此我希望检索工作流参数,以消除对冗余作业的需求。 AWS开发人员指南提供了以下教程:https://docs.AWS.amazon.com/glue/latest/dg/workflow-run-properties-code.htm