当前位置: 首页 > 知识库问答 >
问题:

Airflow异常:返回代码为1的DataFlow失败

皇甫雨石
2023-03-14

我试图通过气流脚本执行数据流罐。对于它,我使用DataFlowJavaoperator。在参数jar中,我传递本地系统中存在的可执行jar文件的路径。但是当我尝试运行这个工作我得到错误作为

{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow
    task_id, variables, dataflow, name, ["java", "-jar"])
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
    _Dataflow(cmd).wait_for_done()
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
    self._proc.returncode))
Exception: DataFlow failed with return code 1`

我的脚本是:

from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'start_date': datetime(2017, 03, 16),
'email': [<EmailID>],

'dataflow_default_options': {
        'project': '<ProjectId>',
       # 'zone': 'europe-west1-d', (i am not sure what should i pass here)
        'stagingLocation': 'gs://spark_3/staging/'
    }
 }

 dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2), 
 default_args=default_args)

 dataflow1 = DataFlowJavaOperator(
 task_id='dataflow_example',
 jar ='/root/airflow_scripts/csvwriter.jar',
 gcp_conn_id  = 'GCP_smoke', 
 dag=dag)

我不知道我犯了什么错误,有人能帮我摆脱困境吗

注意:我正在创建这个jar,同时通过打包所有外部依赖项选择选项作为可运行的jar文件


共有1个答案

姬奇思
2023-03-14

问题出在我用的罐子上。在使用jar之前,请确保jar按预期执行。

示例:如果您的jar是dataflow_job1。jar,使用

java -jar dataflow_job_1.jar --parameters_if_any

jar成功运行后,继续在DataflowJavaOperator jar中使用jar。

此外,如果您遇到与编码器相关的错误,您可能必须使自己的编码器来执行代码。例如,我有一个问题与TableRow类,因为它没有一个默认的编码器,因此我必须弥补这一点:

表格行编码器:

public class TableRowCoder extends Coder<TableRow> {
private static final long serialVersionUID = 1L;
private static final Coder<TableRow> tableRow = TableRowJsonCoder.of();
@Override
public void encode(TableRow value, OutputStream outStream) throws CoderException, IOException {
    tableRow.encode(value, outStream);

}
@Override
public TableRow decode(InputStream inStream) throws CoderException, IOException {
    return new TableRow().set("F1", tableRow.decode(inStream));
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
    // TODO Auto-generated method stub
    return null;
}
@Override
public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {


}
}

然后在你的代码中注册这个编码器

pipeline.getCoderRegistry().registerCoderForClass(TableRow.class, new TableRowCoder())

如果仍然存在错误(与编码器无关),请导航到:

*.jar\META-INF\services\FileSystemRegistrar 

并添加可能出现的任何依赖项。

例如,可能存在以下暂存错误:

Unable to find registrar for gs

我不得不添加以下行使其工作。

org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar
 类似资料:
  • 我正在尝试执行一个DataflowPython文件,该文件使用DataFlowPythonOperator通过气流DAG从GCS存储桶读取文本文件。我已经能够独立地执行python文件,但是当我通过airflow执行它时,它失败了。我正在使用服务帐户对默认gcp连接进行身份验证。我在执行作业时遇到的错误是: 我的脚本: 我的数据流python文件(1readfromgcs.py)包含以下代码: 感

  • 我在运行flutter项目时遇到了这个错误:我已经很努力地遵循了其他internet步骤。我不知道为什么我会看到这个问题。你能帮我做这件事吗? 正在以调试模式在为x86构建的Android SDK上启动lib\main.dart...正在运行分级任务“Assemble Debug”... buildscript{

  • 失败:生成失败,出现异常。 出了什么问题:无法打开设置文件。 缺陷源单元“BuildScript”中的“语义分析”阶段出现异常不受支持的类文件主版本60 > 尝试:使用--stackTrace选项运行以获取堆栈跟踪。使用--info或--debug选项运行以获取更多日志输出。运行--can以获得完整的见解。 在https://help.gradle.org获得更多帮助 生成在605ms内失败异常:

  • 我不明白为什么会发生这种情况。造成这种情况的原因是什么?如何解决这种情况?

  • 如何解决安装Flutter时的错误“异常:Gradle task assembleDebug失败,退出代码为-1”?

  • 我试图安装一些需要的扩展PHP使用docker。 这是我的Dockerfile: 下面是我得到的错误: 错误: /usr/src/php/ext/mysql不存在 用法:/usr/local/bin/docker php ext install[-jN]ext name[ext name…]ie:/usr/local/bin/docker php ext install gd mysqli/usr