AirFlow DAG陷入运行状态

Saa*_*rat 4 airflow

我创建了一个dag,并每天进行调度。它每天都会排队,但任务实际上并未运行。这个问题过去曾在这里提出过但是答案并没有帮助我,因此似乎还有另一个问题。

我的代码在下面共享。我用注释替换了任务t2的SQL。当我使用“ airflow test ...”在CLI上分别运行它们时,每个任务都成功运行。

您能解释一下如何使DAG运行吗?谢谢!

这是DAG代码:

from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator



default_args = {
    'owner' : 'me',
    'depends_on_past' : 'true',
    'start_date' : datetime(2018, 06, 25),
    'email' : ['myemail@moovit.com'],
    'email_on_failure':True,
    'email_on_retry':False,
    'retries' : 2,
    'retry_delay' : timedelta(minutes=5)
}


dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)



t1 = BigQueryOperator(
    task_id='bq_delete_my_agg_table',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    delete `my_project.agg.my_agg_table`
    where date = '{{ macros.ds_add(ds, -1)}}'
    ''',
    dag=dag)

t2 = BigQueryOperator(
    task_id='bq_insert_my_agg_table',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    allow_large_results=True,
    bql='''
    #standardSQL
    Select ... the query continue here.....
    ''',    destination_dataset_table='my_project.agg.my_agg_table',
    dag=dag)


t1 >> t2
Run Code Online (Sandbox Code Playgroud)

tob*_*bi6 7

通常很容易找出未运行任务的原因。在Airflow Web UI中时:

  • 选择任何感兴趣的DAG
  • 现在点击任务
  • 再次单击 Task Instance Details
  • 第一行有一个面板 Task Instance State
  • 在其Reason旁边的框中是运行任务的原因或忽略任务的原因

通常,检查未执行的第一个任务是很有意义的,因为我看到您已经进行了设置depends_on_past=True,如果在错误的情况下使用它会导致问题。

此处的更多信息:Airflow 1.9.0正在排队但未启动任务