当前位置: 首页 > 面试题库 >

Python Airflow-从PythonOperator返回结果

周伟泽
2023-03-14
问题内容

我已经用多个PythonOperator编写了DAG

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
                          provide_context=True,
                          python_callable=Task1, dag=dag1)

def Task1(**kwargs):
    return(kwargs['dag_run'].conf.get('file'))

我从PythonOperator调用“ Task1”方法。该方法正在返回一个值,该值我需要传递给下一个PythonOperator。如何从“
task1”变量中获取值,或者如何从Task1方法中返回该值?

更新 :

    def Task1(**kwargs):
          file_name = kwargs['dag_run'].conf.get[file]
          task_instance = kwargs['task_instance']
          task_instance.xcom_push(key='file', value=file_name) 
          return file_name

  t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)

  t2 =   BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
      dag=dag,
    )

t2.set_upstream(t1)

问题答案:

您可能想查看Airflow的XCOM:https :
//airflow.apache.org/concepts.html#xcoms

如果从函数返回值,则此值存储在xcom中。就您而言,您可以像从其他Python代码一样访问它:

task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')

或像这样的模板中:

{{ task_instance.xcom_pull(task_ids='Task1') }}

如果要指定键,可以将其推入XCOM(在任务内):

task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)

然后,您可以像下面这样访问它:

task_instance.xcom_pull(task_ids='my_task', key='the_key')

编辑1

后续问题: 我不能在其他函数中使用该值,而是将其传递给另一个PythonOperator,例如-“ t2 =” BashOperator(task_id
=’Moving_bucket’,bash_command =’python /home/raw.py“%s” ‘%file_name,dag =
dag)“-我想访问“ Task1”返回的file_name。如何实现?

首先,在我看来,该值实际上 不是 传递给另一个,PythonOperator而是传递给BashOperator

其次,这已经在我上面的回答中涵盖了。该字段bash_command是模板化的(请参阅template_fields源代码:https :
//github.com/apache/incubator-
airflow/blob/master/airflow/operators/bash_operator.py)。因此,我们可以使用模板版本:

BashOperator(
  task_id='Moving_bucket', 
  bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
  dag=dag,
)

编辑2

说明:Airflow的工作方式如下:它将执行Task1,然后填充xcom,然后执行下一个任务。因此,为使您的示例正常工作,您需要先执行Task1,然后在Task1的下游执行Moving_bucket。

由于您使用的是返回函数,因此您也可以省略key='file'fromxcom_pull而不是在函数中手动设置它。



 类似资料:
  • 问题内容: 我有以下从数据库获取十六进制代码的函数 我的问题是我在回调函数中返回了结果,但getColour函数未返回任何内容。我希望getColour函数返回的值。 在我调用getColour的那一刻,它不返回任何内容 我尝试做类似的事情 但当然SELECT查询在返回值时已经完成 问题答案: 您只需要对回调中的db查询结果进行处理。就像。

  • 问题内容: 我正在使用一个调用python脚本的VBA代码。我可以将参数发送到python脚本并使用读取。 在python代码中,我有一个函数,该函数接受给定的参数并返回一个值。 请问如何在VBA中获得返回值? 问题答案: 考虑使用VBA Shell捕获输出线流。确保打印Python脚本以筛选值: 蟒蛇 VBA (下面是字符串输出)

  • 问题内容: 我正在尝试创建一种方法,从中可以查询数据库并检索整个表。 目前,如果我使用这些数据只是正常工作 中 的方法。但是,我希望该方法返回结果。 我正在了解当前代码。 我该如何实现? 问题答案: 您永远不要通过公共方法来回避。这很容易导致资源泄漏,因为您不得不保持语句和连接打开。关闭它们将隐式关闭结果集。但是,将它们保持打开状态将导致它们悬而未决,并且当它们打开过多时,将导致数据库用尽资源。

  • 问题内容: 我在自己的类中拥有所有异步调用,因此我不想将aync’ly设置为全局变量。为此,我想从我的asunc postProcess方法返回对象,例如字符串。 能做到吗? 下面是我类的一般结构,例如,我想从onPostExecute()返回一个字符串。我看到在其他地方提到了委托,但这似乎很混乱,确定有办法为类或方法提供返回类型吗? 问题答案: 像下面这样 和听众课 你可以这样打电话

  • 我有一个简单的代码,可以从第14列开始将转置的范围复制到另一张表的最后一行 它按原样返回零结果。如果我将destrow从公式更改为simple 2(这是现在最后一个空行),则效果很好。为什么不返回目标工作表中的最后一行索引?

  • 我正在尝试处理新的AndroidLollipopMediaProjection API。 我发现(至少在我的股票三星Galaxy S4 jfltexx上),当我开始意图获取捕获屏幕的权限()时,除非我在前面的尝试中选中了“不要再次询问”,否则在中不会有结果... 和结果处理: 权限对话框显示得很好,但是我的活动被隐藏了,它永远不会转到。 知道出什么问题了吗?