我正在使用 Apache Airflow 来管理数据处理管道。在管道的中间,需要在下一步处理之前检查一些数据。例如
... -> task1 -> human review -> task2 -> ...
,其中task1和task2是数据处理任务。当task1完成后,task1生成的数据需要人工审核。审阅者批准数据后,即可启动任务2。人工审核任务可能需要很长时间(例如几周)。
我正在考虑使用外部数据库来存储人工审核结果。并使用Sensor按时间间隔戳出审核结果。但在审核完成之前,它将占用一名 Airflow 工作人员。
任何想法?
结合Freedom 的答案和Robert Elliot 的答案,这是一个完整的工作示例,它给用户两周的时间来查看第一个任务的结果,然后才会永久失败:
from datetime import timedelta
from airflow.models import DAG
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator
from my_tasks import first_task_callable, second_task_callable
TIMEOUT = timedelta(days=14)
def task_to_fail():
raise AirflowException("Please change this step to success to continue")
dag = DAG(dag_id="my_dag")
first_task = PythonOperator(
dag=dag,
task_id="first_task",
python_callable=first_task_callable
)
manual_sign_off = PythonOperator(
dag=dag,
task_id="manual_sign_off",
python_callable=task_to_fail,
retries=1,
max_retry_delay=TIMEOUT
)
second_task = PythonOperator(
dag=dag,
task_id="second_task",
python_callable=second_task_callable
)
first_task >> manual_sign_off >> second_task
Run Code Online (Sandbox Code Playgroud)
小智 1
你的想法对我来说似乎不错。您可以创建专用 DAG 来使用传感器检查审批流程的进度。如果您在传感器上使用较低的超时时间并在此 DAG 上使用适当的时间表,例如每 6 小时一次。使其适应这些任务被批准的频率以及您需要多长时间执行下游任务。
| 归档时间: |
|
| 查看次数: |
7305 次 |
| 最近记录: |