我正在尝试执行一个DataflowPython文件,该文件使用DataFlowPythonOperator通过气流DAG从GCS存储桶读取文本文件。我已经能够独立地执行python文件,但是当我通过airflow执行它时,它失败了。我正在使用服务帐户对默认gcp连接进行身份验证。我在执行作业时遇到的错误是:
{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
{models.py:1417} ERROR - DataFlow failed with return code 2
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 182, in execute
self.py_file, self.py_options)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_python_dataflow
task_id, variables, dataflow, name, ["python"] + py_options)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
_Dataflow(cmd).wait_for_done()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
self._proc.returncode))
Exception: DataFlow failed with return code 2
我的脚本:
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from datetime import datetime, timedelta
# Default DAG parameters
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': <email>,
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime(2018, 4, 30),
'retries': 1,
'retry_delay': timedelta(minutes=1),
'dataflow_default_options': {
'project': '<Project ID>'
}
}
dag = DAG(
dag_id='df_dag_readfromgcs',
default_args=default_args,
schedule_interval=timedelta(minutes=60)
)
task1 = DataFlowPythonOperator(
task_id='task1',
py_file='~<path>/1readfromgcs.py',
gcp_conn_id='default_google_cloud_connection',
dag=dag
)
我的数据流python文件(1readfromgcs.py)包含以下代码:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
import apache_beam.pipeline as pipeline
import apache_beam.io as beamio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
def runCode(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
default='<Input file path>',
help='File name')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--project=<project name>',
'--runner=DataflowRunner',
'--job_name=<job name>',
'--region=europe-west1',
'--staging_location=<GCS staging location>',
'--temp_location=<GCS temp location>'
])
pipeline_options = PipelineOptions(pipeline_args)
p = beam.pipeline.Pipeline(options=pipeline_options)
rows = p | 'read' >> beam.io.ReadFromText(known_args.input)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
runCode()
def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
self.log.info("Start waiting for DataFlow process to complete.")
while self._proc.poll() is None:
ret = select.select(reads, [], [], 5)
if ret is not None:
for fd in ret[0]:
line = self._line(fd)
self.log.debug(line[:-1])
else:
self.log.info("Waiting for DataFlow process to complete.")
if self._proc.returncode is not 0:
raise Exception("DataFlow failed with return code {}".format(
self._proc.returncode))
感谢您的想法并帮助解决我的问题。
此异常源于\u proc
,它是子流程
。它从shell返回一个退出代码。
我还没有使用这个组件。根据正在执行的内容,退出代码2将说明退出的原因。例如,bash中的退出代码意味着:
误用贝壳内置物
并且可能与
缺少关键字或命令,或权限问题
因此,它可能连接到底层数据流配置。尝试在模拟用户时手动执行该文件。
我试图通过气流脚本执行数据流罐。对于它,我使用DataFlowJavaoperator。在参数jar中,我传递本地系统中存在的可执行jar文件的路径。但是当我尝试运行这个工作我得到错误作为 我的脚本是: 我不知道我犯了什么错误,有人能帮我摆脱困境吗
我在Dataflow接口中得到的唯一错误是标准错误消息: 一个工作项尝试了4次,但没有成功。每一次工作人员最终都失去了与服务的联系。 分析Stackdriver日志只显示此错误: 我假设此错误与云运行器有关?但由于我在网上没有找到任何关于这个错误的信息,我想知道是否有人遇到了它,有更好的解释吗? 我使用的是。
我的狗看起来像这样 我的DAG正在执行一个jar文件。jar文件包含运行数据流作业的代码,该作业将数据从GCS写入BQ。jar本身执行成功。 当我尝试执行airflow作业时,我看到以下错误 我做了更多的挖掘,我可以看到气流 正如您可以看到jobs之后的最后一个参数是asia east,因此我觉得airflow job正在尝试使用我在默认参数中提供的区域来搜索数据流job的状态。不确定这是否是正在
我在google sheets中有一个脚本,可以将当前的工作表作为附件发送到电子邮件中。 这个脚本在我看来100%都能正常工作,但是任何其他运行相同脚本的人都会出错。寻找想法或解决方案,这样每个人都可以使用脚本,而不仅仅是我。他们都已授权脚本运行,但仍然存在相同的错误。(脚本下面列出的错误) 其他人收到的错误,每次都对我有效: https://docs.google.com/a/stratusvi
我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以
我在运行flutter项目时遇到了这个错误:我已经很努力地遵循了其他internet步骤。我不知道为什么我会看到这个问题。你能帮我做这件事吗? 正在以调试模式在为x86构建的Android SDK上启动lib\main.dart...正在运行分级任务“Assemble Debug”... buildscript{