仅当 AWS athena 表中的新分区/数据可用时,如何使用 python 中的 DAG 触发 Airflow 任务?

pan*_*kaj 5 python directed-acyclic-graphs airflow amazon-athena airflow-scheduler

我有一个像下面这样的场景:

  1. 仅当源表 (Athena) 中有新数据可用时才触发 aTask 1和。Task 2当一天中有新的数据分区时,应该触发任务 1 和任务 2。
  2. 仅在和Task 3完成时触发Task 1Task 2
  3. 仅触发Task 4完成Task 3

在此输入图像描述

我的代码

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)
Run Code Online (Sandbox Code Playgroud)

实现它的最佳方式是什么?

Ber*_*sen 1

我相信你的问题解决了两个主要问题:

  1. 忘记以显式方式配置schedule_interval,因此@daily正在设置您不期望的东西。
  2. 当您依赖外部事件来完成执行时,如何正确触发和重试 dag 的执行

简短的答案:使用 cron 作业格式显式设置 Schedule_interval 并使用传感器操作员不时检查

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

您的日常任务将在startime什么时间开始,endtime一天中您应该在标记为失败之前检查事件是否已完成的最后一个时间,以及您将检查事件是否发生的时间poke_time间隔。sensor_operator

每当您将 dag 设置为像您所做的那样时,如何显式地处理 cron 作业@daily

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Run Code Online (Sandbox Code Playgroud)

文档中,您可以看到您实际上在做: @daily - Run once a day at midnight

现在就可以理解为什么您会收到超时错误,并且会在 5 分钟后失败,因为您设置了'retries': 1'retry_delay': timedelta(minutes=5)。所以它尝试在午夜运行 dag,但失败了。5分钟后再次重试并再次失败,因此标记为失败。

所以基本上 @daily run 正在设置一个隐式 cron 作业:

@daily -> Run once a day at midnight -> 0 0 * * *
Run Code Online (Sandbox Code Playgroud)

cron 作业格式采用以下格式,您可以将值设置为*您想要说的“全部”。

Minute Hour Day_of_Month Month Day_of_Week

所以 @daily 基本上是说每隔: 所有 days_of_month 所有月份的所有 days_of_week 的 分钟 0 小时 0

因此,您的案例每隔以下时间运行一次:所有天数的所有天数的第 0 小时 10 分钟,所有天数的所有月的第 0 小时 10 分。这以 cron 作业格式翻译为:

0 10 * * *
Run Code Online (Sandbox Code Playgroud)

当您依赖外部事件来完成执行时,如何正确触发和重试 dag 的执行

  1. 您可以使用命令从外部事件触发气流中的 dag airflow trigger_dag。如果您如何触发 lambda 函数/ python 脚本来定位您的气流实例,这将是可能的。

  2. 如果您无法从外部触发 d​​ag,请像 OP 一样使用传感器操作员,为其设置 poke_time 并设置合理的高重试次数。