当前位置: 首页 > 知识库问答 >
问题:

Google Cloud Composer(气流)-DAG内的数据流作业成功执行,但DAG失败

锺离穆冉
2023-03-14

我的狗看起来像这样

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'dataflow_default_options': {
        'project': 'test',
        'tempLocation': 'gs://test/dataflow/pipelines/temp/',
        'stagingLocation': 'gs://test/dataflow/pipelines/staging/',
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '1',
        'region': 'asia-east1'
    }
}

dag = DAG(
    dag_id='gcs_avro_to_bq_dag',
    default_args=default_args,
    description='ETL for loading data from GCS(present in the avro format) to BQ',
    schedule_interval=None,
    dagrun_timeout=datetime.timedelta(minutes=30))

task = DataFlowJavaOperator(
    task_id='gcs_avro_to_bq_flow_job',
    jar='gs://test/dataflow/pipelines/jobs/test-1.0-SNAPSHOT.jar',
    poll_sleep=1,
    options={
        'input': '{{ ts }}',
    },
    dag=dag)

我的DAG正在执行一个jar文件。jar文件包含运行数据流作业的代码,该作业将数据从GCS写入BQ。jar本身执行成功。

当我尝试执行airflow作业时,我看到以下错误

[2020-05-20 17:20:41,934] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,840] {gcp_api_base_hook.py:97} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-05-20 17:20:41,937] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:41,853] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataflow/v1b3/rest
[2020-05-20 17:20:44,338] {base_task_runner.py:101} INFO - Job 274: Subtask gcs_avro_to_bq_flow_job [2020-05-20 17:20:44,338] {discovery.py:873} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json
[2020-05-20 17:20:45,285] {__init__.py:1631} ERROR - <HttpError 404 when requesting https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1?alt=json returned "(7e83a8221abb0a9b): Information about job asia-east1 could not be found in our system. Please double check the id is correct. If it is please contact customer support.">
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1491, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 184, in execut
    self.jar, self.job_class
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 220, in start_java_dataflo
    self._start_dataflow(variables, name, command_prefix, label_formatter
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 286, in wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 200, in _start_dataflo
    self.poll_sleep, job_id).wait_for_done(
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 44, in __init_
    self._job = self._get_job(
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 63, in _get_jo
    jobId=self._job_id).execute(num_retries=5
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
    return wrapped(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut
    raise HttpError(resp, content, uri=self.uri

我做了更多的挖掘,我可以看到气流https://dataflow.googleapis.com/v1b3/projects/test/locations/asia-east1/jobs/asia-east1

正如您可以看到jobs之后的最后一个参数是asia east,因此我觉得airflow job正在尝试使用我在默认参数中提供的区域来搜索数据流job的状态。不确定这是否是正在发生的事情,但我只是想陈述一下观察结果。我的DAG里有什么东西漏了吗?我的java作业逻辑也是这样的

public class GcsAvroToBQ {

    public interface Options extends PipelineOptions {
        @Description("Input")
        ValueProvider<String> getInput();

        void setInput(ValueProvider<String> value);
    }

    /**
     * Main entry point for executing the pipeline.
     *
     * @param args The command-line arguments to the pipeline.
     */
    public static void main(String[] args) {

        GcsAvroToBQ.Options options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(GcsAvroToBQ.Options.class);

        options.getJobName();

        run(options);
    }

    public static PipelineResult run(Options options) {
        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        // My Pipeline logic to read Avro and upload to BQ

        PCollection<TableRow> tableRowsForBQ; // Data to store in BQ
        tableRowsForBQ.apply(
                BigQueryIO.writeTableRows()
                        .to(bqDatasetName)
                        .withSchema(fieldSchemaListBuilder.schema())
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));


        return pipeline.run();
    }
}

共有2个答案

彭朝
2023-03-14

由于修复程序尚未发布,尽管它已合并到主版本中,我将为需要使用比2.19更新的Beam SDK版本的任何人添加以下解决方法。0

其思想是在自定义钩子中实现修复(与dataflow_hook.py相同,但应用了建议的更改),然后实现使用此钩子的自定义操作符。我是这样做的:

首先,我创建了一个名为mydataflow\uhook的文件。py

import re

from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook


class _myDataflow(_Dataflow):
    @staticmethod
    def _extract_job(line):
        job_id_pattern = re.compile(
            br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
        matched_job = job_id_pattern.search(line or '')
        if matched_job:
            return matched_job.group(1).decode()


class MyDataFlowHook(DataFlowHook):
    @GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
    def _start_dataflow(self, variables, name, command_prefix, label_formatter):
        variables = self._set_variables(variables)
        cmd = command_prefix + self._build_cmd(variables, label_formatter)
        job_id = _myDataflow(cmd).wait_for_done()
        _DataflowJob(self.get_conn(), variables['project'], name,
                     variables['region'],
                     self.poll_sleep, job_id,
                     self.num_retries).wait_for_done()

然后,我创建了一个名为my_dataflow\u java\u operator的文件。py

import copy

from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin


class MyDataFlowJavaOperator(DataFlowJavaOperator):
    def execute(self, context):
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.jar = bucket_helper.google_cloud_to_local(self.jar)
        hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)

        dataflow_options = copy.copy(self.dataflow_default_options)
        dataflow_options.update(self.options)

        hook.start_java_dataflow(self.job_name, dataflow_options,
                                 self.jar, self.job_class)

class MyDataFlowPlugin(AirflowPlugin):
    """Expose Airflow operators."""

    name = 'dataflow_fix_plugin'
    operators = [MyDataFlowJavaOperator]

最后,我按照以下结构将这些文件上传到Composer环境的桶中:

├── dags
│   └── my_dag.py
└── plugins
    ├── hooks
    │   └── my_dataflow_hook.py
    └── my_dataflow_java_operator.py

现在,我可以在我的DAG中使用MyDataFlowJavaOperator创建任务:

from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowJavaOperator
...
with DAG("df-custom-test", default_args=default_args) as dag:
    test_task = MyDataFlowJavaOperator(dag=dag, task_id="df-java", py_file=PY_FILE, job_name=JOB_NAME)

当然,如果需要,您可以使用DataFlowPythonOperatorDataflowTemplateOperator执行相同的操作。

公孙新觉
2023-03-14

这是sdk版本2.20中确认的错误。0

https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/hooks/dataflow.py#L47

请使用2.19。0版本,它应该可以正常工作。

 <dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  <version>2.19.0</version>
  <scope>runtime</scope>
</dependency>
 类似资料:
  • 我在Dataflow(Apache beam)上创建了一个管道,以便在Google BigQuery上读写数据,但是我在创建DAG时遇到了问题,就像我在Airflow上做的那样。 这是我的代码中的一个示例: 我希望按顺序执行这些任务,而Dataflow是并行执行的 我如何让它们按顺序执行?

  • 我在Google Cloud Composer中从Airflow调用数据流作业, a、 b和c是调用数据流作业的任务。我只想在数据流作业完成后运行b,问题是它们都同时运行。 我怎么能等到之前的工作完成?

  • 问题内容: 假设您有一个气流DAG,它对回填没有意义,这意味着在运行一次之后,快速运行后续时间将完全没有意义。 例如,如果您要从仅每小时更新一次的某个源中加载数据,则快速连续进行的回填将一次又一次地导入相同的数据。 当您实例化一个新的每小时任务时,这特别令人讨厌,并且在它开始以指定的时间间隔运行之前,它为错过的每个小时运行了一定的时间,进行了多余的工作。 我能想到的唯一解决方案是他们在文档的FAQ

  • 我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以

  • 我使用的是气流 1.7.1.3。 我有一个并发DAG/任务的问题。当DAG运行时,调度程序不再启动其他DAG。似乎调度程序完全冻结(不再有日志)……直到运行的DAG完成。然后,新的DAGrun被触发。我的不同任务是长时间运行的ECS任务(约10分钟) 我使用了,并允许默认配置dag_concurrency=16。我使用并自动重新启动它,并为我的所有DAG声明设置 而我只有2个CPU可用。 有什么想

  • 我正在运行一个Apache Beam管道(使用Google Dataflow部署),该管道正在与Apache Airflow一起编排。 DAG文件如下所示: 梁管道文件(