我在Google Cloud Composer中从Airflow调用数据流作业,
a >> b >> c
a、 b和c是调用数据流作业的任务。我只想在数据流作业完成后运行b,问题是它们都同时运行。
我怎么能等到之前的工作完成?
放置作业后,您需要放置一个传感器,以验证作业是否完成。
例子:
start_python_job_async = DataflowCreatePythonJobOperator(
task_id="start-python-job-async",
py_file=GCS_PYTHON,
py_options=[],
job_name='{{task.task_id}}',
options={
'output': GCS_OUTPUT,
},
py_requirements=['apache-beam[gcp]==2.25.0'],
py_interpreter='python3',
py_system_site_packages=False,
location='europe-west3',
wait_until_finished=False,
)
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location='europe-west3',
)
start_python_job_async >> wait_for_python_job_async_done
您可以查看进一步解释的文档和示例
我有一个flink流媒体作业,它从Kafka读取数据并写入文件系统中适当的分区。例如,作业被配置为使用一个bucketing接收器,该接收器写入/数据/日期=${date}/小时=${hour}。 如何检测分区是否已准备好使用,以便相应的气流管道可以在这一小时内进行批处理?
我的狗看起来像这样 我的DAG正在执行一个jar文件。jar文件包含运行数据流作业的代码,该作业将数据从GCS写入BQ。jar本身执行成功。 当我尝试执行airflow作业时,我看到以下错误 我做了更多的挖掘,我可以看到气流 正如您可以看到jobs之后的最后一个参数是asia east,因此我觉得airflow job正在尝试使用我在默认参数中提供的区域来搜索数据流job的状态。不确定这是否是正在
我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题: 一个数据流作业中的并行管道 如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示: 我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?
在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?
我正在运行数据流作业从气流。我需要说我是气流的新手。数据流(从气流运行)正在成功运行,但我可以看到气流在获得工作状态时遇到了一些问题,我收到了无限的消息,比如: 谷歌云数据流作业尚不可用。。 以下是将所有步骤添加到数据流后的日志(我将{project ectID}和{jobID}放在它所在的位置): 你知道这是什么原因吗?我找不到与此问题相关的任何解决方案。我应该提供更多信息吗? 这是我在DAG中
当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?