我试图通过气流脚本执行数据流罐。对于它,我使用DataFlowJavaoperator。在参数jar中,我传递本地系统中存在的可执行jar文件的路径。但是当我尝试运行这个工作我得到错误作为
{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute
hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow
task_id, variables, dataflow, name, ["java", "-jar"])
File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
_Dataflow(cmd).wait_for_done()
File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
self._proc.returncode))
Exception: DataFlow failed with return code 1`
我的脚本是:
from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2017, 03, 16),
'email': [<EmailID>],
'dataflow_default_options': {
'project': '<ProjectId>',
# 'zone': 'europe-west1-d', (i am not sure what should i pass here)
'stagingLocation': 'gs://spark_3/staging/'
}
}
dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2),
default_args=default_args)
dataflow1 = DataFlowJavaOperator(
task_id='dataflow_example',
jar ='/root/airflow_scripts/csvwriter.jar',
gcp_conn_id = 'GCP_smoke',
dag=dag)
我不知道我犯了什么错误,有人能帮我摆脱困境吗
注意:我正在创建这个jar,同时通过打包所有外部依赖项选择选项作为可运行的jar文件
问题出在我用的罐子上。在使用jar之前,请确保jar按预期执行。
示例:如果您的jar是dataflow_job1。jar,使用
java -jar dataflow_job_1.jar --parameters_if_any
jar成功运行后,继续在DataflowJavaOperator jar中使用jar。
此外,如果您遇到与编码器相关的错误,您可能必须使自己的编码器来执行代码。例如,我有一个问题与TableRow类,因为它没有一个默认的编码器,因此我必须弥补这一点:
表格行编码器:
public class TableRowCoder extends Coder<TableRow> {
private static final long serialVersionUID = 1L;
private static final Coder<TableRow> tableRow = TableRowJsonCoder.of();
@Override
public void encode(TableRow value, OutputStream outStream) throws CoderException, IOException {
tableRow.encode(value, outStream);
}
@Override
public TableRow decode(InputStream inStream) throws CoderException, IOException {
return new TableRow().set("F1", tableRow.decode(inStream));
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
// TODO Auto-generated method stub
return null;
}
@Override
public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
}
}
然后在你的代码中注册这个编码器
pipeline.getCoderRegistry().registerCoderForClass(TableRow.class, new TableRowCoder())
如果仍然存在错误(与编码器无关),请导航到:
*.jar\META-INF\services\FileSystemRegistrar
并添加可能出现的任何依赖项。
例如,可能存在以下暂存错误:
Unable to find registrar for gs
我不得不添加以下行使其工作。
org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar
我正在尝试执行一个DataflowPython文件,该文件使用DataFlowPythonOperator通过气流DAG从GCS存储桶读取文本文件。我已经能够独立地执行python文件,但是当我通过airflow执行它时,它失败了。我正在使用服务帐户对默认gcp连接进行身份验证。我在执行作业时遇到的错误是: 我的脚本: 我的数据流python文件(1readfromgcs.py)包含以下代码: 感
我在运行flutter项目时遇到了这个错误:我已经很努力地遵循了其他internet步骤。我不知道为什么我会看到这个问题。你能帮我做这件事吗? 正在以调试模式在为x86构建的Android SDK上启动lib\main.dart...正在运行分级任务“Assemble Debug”... buildscript{
失败:生成失败,出现异常。 出了什么问题:无法打开设置文件。 缺陷源单元“BuildScript”中的“语义分析”阶段出现异常不受支持的类文件主版本60 > 尝试:使用--stackTrace选项运行以获取堆栈跟踪。使用--info或--debug选项运行以获取更多日志输出。运行--can以获得完整的见解。 在https://help.gradle.org获得更多帮助 生成在605ms内失败异常:
我不明白为什么会发生这种情况。造成这种情况的原因是什么?如何解决这种情况?
如何解决安装Flutter时的错误“异常:Gradle task assembleDebug失败,退出代码为-1”?
我试图安装一些需要的扩展PHP使用docker。 这是我的Dockerfile: 下面是我得到的错误: 错误: /usr/src/php/ext/mysql不存在 用法:/usr/local/bin/docker php ext install[-jN]ext name[ext name…]ie:/usr/local/bin/docker php ext install gd mysqli/usr