同事们,我们需要帮助。有两个dags Parent和Child,parent有自己的时间表,假设'30 * * * *',child'1 8-17 ** 1-5',child等待parent执行,例如40分钟,如果parent以错误结束,那么子类也会因错误而崩溃,否则执行子类的下一个任务。问题是,即使在最简单的情况下,这也不起作用,我不了解如何同步它们。我写了这样的代码:
\n达格父母
\nimport time\n\nfrom airflow import DAG\nfrom airflow.operators.dummy_operator import DummyOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom airflow.sensors.external_task_sensor import ExternalTaskSensor, ExternalTaskMarker\n\nstart_date = datetime(2021, 3, 1, 20, 36, 0)\n\nclass Exept(Exception):\n pass\n\ndef wait():\n time.sleep(3)\n with open('etl.txt', 'r') as txt:\n line = txt.readline()\n if line == 'err':\n print(1)\n raise Exept\n return 'etl success'\n\n\nwith DAG(\n dag_id="dag_etl1",\n start_date=start_date,\n schedule_interval='* * * * *',\n tags=['example2'],\n) as etl1:\n parent_task = ExternalTaskMarker(\n task_id="parent_task",\n external_dag_id="dag_etl1",\n external_task_id="etl_child",\n )\n wait_timer = PythonOperator(task_id='wait_timer', python_callable=wait)\n \n wait_timer …Run Code Online (Sandbox Code Playgroud)