如何在Airflow上重新启动失败的任务

Che*_*n J 25 python hadoop bigdata airflow apache-airflow

我使用的是LocalExecutor,我的dag有3个任务,其中任务(C)依赖于任务(A).任务(B)和任务(A)可以并行运行,如下所示

A - >Ç

所以任务(A)失败了,但任务(B)运行正常.任务(C)尚未运行,因为任务(A)失败.

我的问题是我如何单独运行任务(A),因此任务(A)运行一旦任务(A)完成,并且Airflow UI将它们标记为成功.

jhn*_*lvr 52

在UI中:

  1. 转到dag,然后想要改变你想要改变的跑步
  2. 单击GraphView
  3. 单击任务A.
  4. 点击"清除"

这将让任务A再次运行,如果成功,任务C应该运行.这是有效的,因为当您清除任务的状态时,调度程序会将其视为之前未运行此dag运行.

  • 也可以使用命令行:`airflow clear -s <start_date> -e <end_date> -t task_a <dag_name>` (10认同)
  • 是否可以在代码中包含此内容?如果任务失败,哪个将在固定时间后检查并尝试清除它? (3认同)
  • 非常感谢!UI和Command系列都适合我! (2认同)
  • @TomasJansson 它将使用与原始执行时间相同的执行时间。但是您的 start_date 将具有新值(它将具有当前时间戳)。您可以在“任务实例详细信息”屏幕中看到所有内容。 (2认同)

phe*_*der 5

这是一个替代解决方案,您可以在其中清除并自动重试某些任务。如果您只想清除某个任务,则不会使用 -d (下游)标志:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


def clear_upstream_task(context):
    execution_date = context.get("execution_date")
    clear_tasks = BashOperator(
        task_id='clear_tasks',
        bash_command=f'airflow tasks clear -s {execution_date}  -t t1 -d -y clear_upstream_task'
    )
    return clear_tasks.execute(context=context)


# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}


with DAG('clear_upstream_task',
         start_date=datetime(2021, 1, 1),
         max_active_runs=3,
         schedule_interval=timedelta(minutes=5),
         default_args=default_args,
         catchup=False
         ) as dag:
    t0 = DummyOperator(
        task_id='t0'
    )

    t1 = DummyOperator(
        task_id='t1'
    )

    t2 = DummyOperator(
        task_id='t2'
    )
    t3 = BashOperator(
        task_id='t3',
        bash_command='exit 123',
        on_failure_callback=clear_upstream_task
    )

    t0 >> t1 >> t2 >> t3
Run Code Online (Sandbox Code Playgroud)