气流:ExternalTask​​Sensor 未按预期工作。不同的任务计划

Дми*_*рин 6 python airflow airflow-scheduler

同事们,我们需要帮助。有两个dags Parent和Child,parent有自己的时间表,假设'30 * * * *',child'1 8-17 ** 1-5',child等待parent执行,例如40分钟,如果parent以错误结束,那么子类也会因错误而崩溃,否则执行子类的下一个任务。问题是,即使在最简单的情况下,这也不起作用,我不了解如何同步它们。我写了这样的代码:

\n

达格父母

\n
import time\n\nfrom airflow import DAG\nfrom airflow.operators.dummy_operator import DummyOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom airflow.sensors.external_task_sensor  import ExternalTaskSensor, ExternalTaskMarker\n\nstart_date =  datetime(2021, 3, 1, 20, 36, 0)\n\nclass Exept(Exception):\n    pass\n\ndef wait():\n    time.sleep(3)\n    with open('etl.txt', 'r') as txt:\n        line = txt.readline()\n        if line == 'err':\n            print(1)\n            raise Exept\n    return 'etl success'\n\n\nwith DAG(\n    dag_id="dag_etl1",\n    start_date=start_date,\n    schedule_interval='* * * * *',\n    tags=['example2'],\n) as etl1:\n    parent_task = ExternalTaskMarker(\n        task_id="parent_task",\n        external_dag_id="dag_etl1",\n        external_task_id="etl_child",\n    )\n    wait_timer = PythonOperator(task_id='wait_timer', python_callable=wait)\n    \n    wait_timer >> parent_task\n
Run Code Online (Sandbox Code Playgroud)\n

达格孩子

\n
from datetime import datetime, timedelta\n\n\nfrom airflow import DAG\nfrom airflow.operators.dummy_operator import DummyOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom airflow.sensors.external_task_sensor  import ExternalTaskSensor, ExternalTaskMarker\n\nfrom etl_parent import etl1, wait_timer, parent_task\n\nstart_date =  datetime(2021, 3, 1, 20, 36, 0)\n\ndef check():\n    return 'I succeeded'\n\nwith DAG(\n    dag_id='etl_child', \n    start_date=start_date, \n    schedule_interval='* * * * *',\n    tags = ['testing_child_dag']\n) as etl_child:\n    status = ExternalTaskSensor(\n        task_id="dag_etl1",\n        external_dag_id=etl1.dag_id,\n        external_task_id=parent_task.task_id,\n        allowed_states=['success'],\n        mode='reschedule',\n        execution_delta=timedelta(minutes=1),\n        timeout=60,\n    )\n\n    task1 = PythonOperator(task_id='task1', python_callable=check)\n    \n    status >> task1\n
Run Code Online (Sandbox Code Playgroud)\n

正如您所看到的,我试图模拟如果在文本文件中指定 err 则父任务失败并在任何其他情况下成功的情况。但这根本不像我预期的那样工作,在 dag 第一次启动时一切都很好,它工作正常,如果我更改文本文件中的数据,那么父任务可以正常工作,例如,我启动父任务dag 有一个故意的错误,一切都会正常工作,子类将以错误结束,但如果我更改文本,父类将再次正常工作,但子类将继续下降一段时间,那么它可能是正确的,但不是事实。如果已知发射成功,情况是一样的,完全相反。另外,我不明白如何在父 dag 中组织等待任务完成。

\n

请帮助)我最近一直在使用气流,我可能遗漏了一些东西。

\n

Nah*_* O. 6

您的 DAG(DAG A 和 DAG B)似乎有两个完全不同的时间表。如果您想避免并发修改问题,并断言 DAG A 当前未在触发 DAG B 之前运行,您可能需要使用ExternalTask​​Sensor。

在这种情况下,您可以使用以下execution_date_fn参数:

ExternalTaskSensor(
    task_id="sensor",
    external_dag_id="dag_id",
    execution_date_fn=get_most_recent_dag_run,
    mode="reschedule")
Run Code Online (Sandbox Code Playgroud)

get_most_recent_dag_run函数如下所示:

from airflow.models import DagRun

def get_most_recent_dag_run(dt):
    dag_runs = DagRun.find(dag_id="dag_id")
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    if dag_runs:
        return dag_runs[0].execution_date
Run Code Online (Sandbox Code Playgroud)

get_most_recent_dag_run查找所提供的dag_id的最后一次 DAG 运行,从而允许ExternalSensor 工作。


Mic*_*kov 2

ExternalTask​​Sensor 出现问题的最常见原因是execution_delta 参数,所以我将从这里开始。

我看到父 dag 和子 dag 具有完全相同的 start_date 和 Schedule_interval,但您的execution_delta 是 1 分钟。在这种情况下,您的子 dag 会查找在 20:35 开始的父 dag(在您的示例中),但它实际上是在 20:36 开始的,因此失败。即使为了测试,尝试将您的父 dag 设置为在 20:35 开始,看看是否可以解决问题。

这是一篇很好的文章,详细介绍了 Schedule_interval 陷阱https://medium.com/@fninsiima/sensing-the-completion-of-external-airflow-tasks-827344d03142

关于等待时间,这是ExternalTask​​Sensor 中的超时参数。在您的情况下,它会等待 60 秒才会失败。就我个人而言,我会非常谨慎地设置较长的超时时间。当您的传感器等待时,它会占用一个工作线程,因此其他任务无法使用它,这可能会导致您的其他任务被锁定而无法执行,尤其是当您有很多传感器时。