Airflow - BashOperator:获取“Dag 运行已为 DAG 死锁:...”错误

Kim*_*oll 2 python concurrency workflow airflow

真的很喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到了错误:“ {jobs.py:538} ERROR - Dag running is deadlocked for DAG: TEST_SCHEDULER_DAG ”。

这是一个新的气流安装 (v1.7.1.3),我已经能够很好地运行其他预定的 dag 文件。我的环境是 Linux (ubuntu 16.04)、python 2.7.12 (anaconda)、postgresql 9.5.5,并且使用 LocalExecutor。

我遇到死锁错误的 DAG 是:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'owner.name',
    'depends_on_past': True,
    'start_date': datetime(2016, 11, 30, 8, 0, 0),
    'retries': 0,
    'retry_delay': timedelta(seconds=60),
}

tst_dag = DAG(dag_id='TEST_SCHEDULER_DAG',
              default_args=default_args,
              schedule_interval='10 * * * *')

t1 = BashOperator(
    task_id='task_1',
    bash_command='sleep 10',
    dag=tst_dag)

t2 = BashOperator(
    task_id='task_2',
    bash_command='sleep 10',                  
    dag=tst_dag)

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

同样,第一次执行运行良好,但所有后续执行 (DagRun) 都显示为“失败”,并且我在控制台上看到了“死锁”错误。

谢谢!

Moe*_*ars 5

@Gergely 的回应帮助了我。我试图为超过 DAG 上指定的 end_date 的日期运行气流回填。一旦我更改了 DAG 对象上的 end_date 以包括我回填的日期,它就起作用了。