小编Дми*_*рин的帖子

气流:ExternalTask​​Sensor 未按预期工作。不同的任务计划

同事们,我们需要帮助。有两个dags Parent和Child,parent有自己的时间表,假设'30 * * * *',child'1 8-17 ** 1-5',child等待parent执行,例如40分钟,如果parent以错误结束,那么子类也会因错误而崩溃,否则执行子类的下一个任务。问题是,即使在最简单的情况下,这也不起作用,我不了解如何同步它们。我写了这样的代码:

\n

达格父母

\n
import time\n\nfrom airflow import DAG\nfrom airflow.operators.dummy_operator import DummyOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom airflow.sensors.external_task_sensor  import ExternalTaskSensor, ExternalTaskMarker\n\nstart_date =  datetime(2021, 3, 1, 20, 36, 0)\n\nclass Exept(Exception):\n    pass\n\ndef wait():\n    time.sleep(3)\n    with open('etl.txt', 'r') as txt:\n        line = txt.readline()\n        if line == 'err':\n            print(1)\n            raise Exept\n    return 'etl success'\n\n\nwith DAG(\n    dag_id="dag_etl1",\n    start_date=start_date,\n    schedule_interval='* * * * *',\n    tags=['example2'],\n) as etl1:\n    parent_task = ExternalTaskMarker(\n        task_id="parent_task",\n        external_dag_id="dag_etl1",\n        external_task_id="etl_child",\n    )\n    wait_timer = PythonOperator(task_id='wait_timer', python_callable=wait)\n    \n    wait_timer …
Run Code Online (Sandbox Code Playgroud)

python airflow airflow-scheduler

6
推荐指数
2
解决办法
7946
查看次数

标签 统计

airflow ×1

airflow-scheduler ×1

python ×1