我对Apache Beam Python SDK定义的数据流有一些问题。如果我单步执行我的代码,它将到达pipeline.run()步骤,我认为这意味着成功定义了执行图。然而,该作业从未在数据流监视工具上注册,这使我认为它从未到达管道验证步骤。
我想更多地了解这两个步骤之间发生了什么,以帮助调试该问题。我看到的输出表明requirements.txt
和Apache-beam
中的包正在安装pip,似乎有些东西在发送到Google的服务器之前已经被picked了。为什么会这样?如果我已经下载了apache-beam,为什么还要再下载它呢?腌制到底是什么?
我不是在这里寻找我的问题的解决方案,只是试图更好地理解这个过程。
当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?
在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?
我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以
我已经开始使用Scala SDK Scio开发我的第一个DataFlow工作。数据流作业将以流模式运行。 有谁能建议最好的部署方法吗?我已经在Scio文档中阅读了他们使用的,然后将其部署到Docker容器中。我也读过关于使用数据流模板的内容(但不是很详细)。 什么是最好的?
我正在运行数据流作业从气流。我需要说我是气流的新手。数据流(从气流运行)正在成功运行,但我可以看到气流在获得工作状态时遇到了一些问题,我收到了无限的消息,比如: 谷歌云数据流作业尚不可用。。 以下是将所有步骤添加到数据流后的日志(我将{project ectID}和{jobID}放在它所在的位置): 你知道这是什么原因吗?我找不到与此问题相关的任何解决方案。我应该提供更多信息吗? 这是我在DAG中
使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错: