Дми*_*рин 6 python airflow airflow-scheduler
同事们,我们需要帮助。有两个dags Parent和Child,parent有自己的时间表,假设'30 * * * *',child'1 8-17 ** 1-5',child等待parent执行,例如40分钟,如果parent以错误结束,那么子类也会因错误而崩溃,否则执行子类的下一个任务。问题是,即使在最简单的情况下,这也不起作用,我不了解如何同步它们。我写了这样的代码:
\n达格父母
\nimport time\n\nfrom airflow import DAG\nfrom airflow.operators.dummy_operator import DummyOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom airflow.sensors.external_task_sensor import ExternalTaskSensor, ExternalTaskMarker\n\nstart_date = datetime(2021, 3, 1, 20, 36, 0)\n\nclass Exept(Exception):\n pass\n\ndef wait():\n time.sleep(3)\n with open('etl.txt', 'r') as txt:\n line = txt.readline()\n if line == 'err':\n print(1)\n raise Exept\n return 'etl success'\n\n\nwith DAG(\n dag_id="dag_etl1",\n start_date=start_date,\n schedule_interval='* * * * *',\n tags=['example2'],\n) as etl1:\n parent_task = ExternalTaskMarker(\n task_id="parent_task",\n external_dag_id="dag_etl1",\n external_task_id="etl_child",\n )\n wait_timer = PythonOperator(task_id='wait_timer', python_callable=wait)\n \n wait_timer >> parent_task\nRun Code Online (Sandbox Code Playgroud)\n达格孩子
\nfrom datetime import datetime, timedelta\n\n\nfrom airflow import DAG\nfrom airflow.operators.dummy_operator import DummyOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom airflow.sensors.external_task_sensor import ExternalTaskSensor, ExternalTaskMarker\n\nfrom etl_parent import etl1, wait_timer, parent_task\n\nstart_date = datetime(2021, 3, 1, 20, 36, 0)\n\ndef check():\n return 'I succeeded'\n\nwith DAG(\n dag_id='etl_child', \n start_date=start_date, \n schedule_interval='* * * * *',\n tags = ['testing_child_dag']\n) as etl_child:\n status = ExternalTaskSensor(\n task_id="dag_etl1",\n external_dag_id=etl1.dag_id,\n external_task_id=parent_task.task_id,\n allowed_states=['success'],\n mode='reschedule',\n execution_delta=timedelta(minutes=1),\n timeout=60,\n )\n\n task1 = PythonOperator(task_id='task1', python_callable=check)\n \n status >> task1\nRun Code Online (Sandbox Code Playgroud)\n正如您所看到的,我试图模拟如果在文本文件中指定 err 则父任务失败并在任何其他情况下成功的情况。但这根本不像我预期的那样工作,在 dag 第一次启动时一切都很好,它工作正常,如果我更改文本文件中的数据,那么父任务可以正常工作,例如,我启动父任务dag 有一个故意的错误,一切都会正常工作,子类将以错误结束,但如果我更改文本,父类将再次正常工作,但子类将继续下降一段时间,那么它可能是正确的,但不是事实。如果已知发射成功,情况是一样的,完全相反。另外,我不明白如何在父 dag 中组织等待任务完成。
\n请帮助)我最近一直在使用气流,我可能遗漏了一些东西。
\n您的 DAG(DAG A 和 DAG B)似乎有两个完全不同的时间表。如果您想避免并发修改问题,并断言 DAG A 当前未在触发 DAG B 之前运行,您可能需要使用ExternalTaskSensor。
在这种情况下,您可以使用以下execution_date_fn参数:
ExternalTaskSensor(
task_id="sensor",
external_dag_id="dag_id",
execution_date_fn=get_most_recent_dag_run,
mode="reschedule")
Run Code Online (Sandbox Code Playgroud)
该get_most_recent_dag_run函数如下所示:
from airflow.models import DagRun
def get_most_recent_dag_run(dt):
dag_runs = DagRun.find(dag_id="dag_id")
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
if dag_runs:
return dag_runs[0].execution_date
Run Code Online (Sandbox Code Playgroud)
将get_most_recent_dag_run查找所提供的dag_id的最后一次 DAG 运行,从而允许ExternalSensor 工作。
ExternalTaskSensor 出现问题的最常见原因是execution_delta 参数,所以我将从这里开始。
我看到父 dag 和子 dag 具有完全相同的 start_date 和 Schedule_interval,但您的execution_delta 是 1 分钟。在这种情况下,您的子 dag 会查找在 20:35 开始的父 dag(在您的示例中),但它实际上是在 20:36 开始的,因此失败。即使为了测试,尝试将您的父 dag 设置为在 20:35 开始,看看是否可以解决问题。
这是一篇很好的文章,详细介绍了 Schedule_interval 陷阱https://medium.com/@fninsiima/sensing-the-completion-of-external-airflow-tasks-827344d03142
关于等待时间,这是ExternalTaskSensor 中的超时参数。在您的情况下,它会等待 60 秒才会失败。就我个人而言,我会非常谨慎地设置较长的超时时间。当您的传感器等待时,它会占用一个工作线程,因此其他任务无法使用它,这可能会导致您的其他任务被锁定而无法执行,尤其是当您有很多传感器时。
| 归档时间: |
|
| 查看次数: |
7946 次 |
| 最近记录: |