如何在 Apache Airflow Dag 中添加手动任务

Fre*_*dom 6 airflow

我正在使用 Apache Airflow 来管理数据处理管道。在管道的中间,需要在下一步处理之前检查一些数据。例如 ... -> task1 -> human review -> task2 -> ... ,其中task1和task2是数据处理任务。当task1完成后,task1生成的数据需要人工审核。审阅者批准数据后,即可启动任务2。人工审核任务可能需要很长时间(例如几周)。

我正在考虑使用外部数据库来存储人工审核结果。并使用Sensor按时间间隔戳出审核结果。但在审核完成之前,它将占用一名 Airflow 工作人员。

任何想法?

tur*_*erm 5

结合Freedom 的答案Robert Elliot 的答案,这是一个完整的工作示例,它给用户两周的时间来查看第一个任务的结果,然后才会永久失败:

from datetime import timedelta

from airflow.models import DAG
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator

from my_tasks import first_task_callable, second_task_callable


TIMEOUT = timedelta(days=14)


def task_to_fail():
    raise AirflowException("Please change this step to success to continue")


dag = DAG(dag_id="my_dag")

first_task = PythonOperator(
    dag=dag,
    task_id="first_task",
    python_callable=first_task_callable
)

manual_sign_off = PythonOperator(
    dag=dag,
    task_id="manual_sign_off",
    python_callable=task_to_fail,
    retries=1,
    max_retry_delay=TIMEOUT
)

second_task = PythonOperator(
    dag=dag,
    task_id="second_task",
    python_callable=second_task_callable
)

first_task >> manual_sign_off >> second_task
Run Code Online (Sandbox Code Playgroud)


小智 1

你的想法对我来说似乎不错。您可以创建专用 DAG 来使用传感器检查审批流程的进度。如果您在传感器上使用较低的超时时间并在此 DAG 上使用适当的时间表,例如每 6 小时一次。使其适应这些任务被批准的频率以及您需要多长时间执行下游任务。