当前位置: 首页 > 知识库问答 >
问题:

如何使用python中的DAG,仅当AWS athena表中的新分区/数据可用时才触发气流任务?

柳弘方
2023-03-14

我有一个如下的场景:

    < li >仅当源表(Athena)中有新数据可用时,才触发< code >任务1和< code >任务2。当一天中有新数据分区时,应触发Task1和Task2。 < li >仅在完成< code >任务1和< code >任务2时触发< code >任务3 < li >仅在< code >任务3完成时触发< code >任务4

我的代码

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

实现它的最佳方法是什么?

共有1个答案

暴乐邦
2023-03-14

我相信你的问题解决了两个主要问题:

    < li >忘记以明确的方式配置< code>schedule_interval,因此@daily正在设置一些您没有预料到的内容。 < li >当您依赖外部事件来完成执行时,如何正确地触发和重试dag的执行

简单答案:用cron作业格式明确设置scheduleinterval,并使用传感器操作符不时检查

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- html" target="_blank">set a poke_time in seconds
    dag=dag)

其中 startime 是您的日常任务开始的时间,结束时间是您应该在标记为失败之前检查事件是否完成的一天中的最后一个时间poke_time是您的sensor_operator检查事件是否发生的间隔。

当您像以前一样将dag设置为< code>@daily时,如何显式处理cron作业:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

从文档中,你可以看到你实际上在做:@daily-每天午夜运行一次

这就解释了为什么会出现超时错误,并在5分钟后失败,因为您设置了“retries”:1'retry_delay':timedelta(minutes=5)。所以它试图在午夜运行dag,但失败了。5分钟后再次重试,但再次失败,因此将其标记为失败。

所以基本上@day run正在设置一个隐式cron作业:

@daily -> Run once a day at midnight -> 0 0 * * *

cron 作业格式采用以下格式,每当您想说“全部”时,您可以将值设置为 *

分钟小时Day_of_Month月Day_of_Week

所以@day基本上是说每隔:分钟0小时0所有days_of_month所有月份的所有days_of_week

因此,您的案例每隔以下时间运行:分钟0小时10分钟/月/月/周/日。这在cron作业格式中转化为:

0 10 * * *

当您依赖外部事件来完成执行时,如何正确触发和重试dag的执行

>

  • 您可以使用命令airflowtriggerdag从外部事件触发气流中的dag。如果您能够触发一个lambda函数/python脚本来针对您的气流实例,这将是可能的。

    如果无法在外部触发 dag,请使用像 OP 那样的传感器运算符,为其设置poke_time并设置合理的重试次数。

  •  类似资料:
    • 我有一个flink流媒体作业,它从Kafka读取数据并写入文件系统中适当的分区。例如,作业被配置为使用一个bucketing接收器,该接收器写入/数据/日期=${date}/小时=${hour}。 如何检测分区是否已准备好使用,以便相应的气流管道可以在这一小时内进行批处理?

    • 我在Google Cloud Composer中从Airflow调用数据流作业, a、 b和c是调用数据流作业的任务。我只想在数据流作业完成后运行b,问题是它们都同时运行。 我怎么能等到之前的工作完成?

    • 我使用的是气流 1.7.1.3。 我有一个并发DAG/任务的问题。当DAG运行时,调度程序不再启动其他DAG。似乎调度程序完全冻结(不再有日志)……直到运行的DAG完成。然后,新的DAGrun被触发。我的不同任务是长时间运行的ECS任务(约10分钟) 我使用了,并允许默认配置dag_concurrency=16。我使用并自动重新启动它,并为我的所有DAG声明设置 而我只有2个CPU可用。 有什么想

    • 问题内容: 由于声誉的限制,这是对先前问题的回答的后续问题。 但是想象一下一个有变化的列的大桌子。您必须比较每列,如果数据库发生更改,则必须调整触发器。而且比较硬编码的每一行都没有“感觉”好:) 是的,但这就是进行的方式。 附带说明一下,在更新之前先检查一下也是一种好习惯: 在您的示例中,这将使其更新(从而覆盖)两行而不是三行。 我想知道在处理NULL值时是否有更有效的方法来测试每个字段的更改。

    • 问题内容: 仅在真正更改数据的情况下,才有可能使用“更新后”触发器。我知道“新旧”。但是使用它们时,我只能比较列。例如“ NEW.count <> OLD.count”。 但我想要类似的东西:如果“ NEW <> OLD”,则运行触发器 一个例子: 关键是,有一个更新,但是 什么都没有改变 。但是无论如何,触发器都在运行。恕我直言,应该有一个没有的方法。 我知道我可以使用 如果现在b <> OLD

    • 我的狗看起来像这样 我的DAG正在执行一个jar文件。jar文件包含运行数据流作业的代码,该作业将数据从GCS写入BQ。jar本身执行成功。 当我尝试执行airflow作业时,我看到以下错误 我做了更多的挖掘,我可以看到气流 正如您可以看到jobs之后的最后一个参数是asia east,因此我觉得airflow job正在尝试使用我在默认参数中提供的区域来搜索数据流job的状态。不确定这是否是正在