Loc*_*cou 5 python directed-acyclic-graphs airflow
我正在尝试以“每30分钟”的间隔设置两个dag的气流。
第一个dag(提取)如下所示:
extract_table_user_schema >>提取表用户数据>> finish_extract_table_user

第二个dag(transform)看起来像这样:
sensor_wait_for_finish_extract_table_user >>转换表用户>> finish_transform_table_user

对于两个dag,我都将schedule_interval设置为 "*/30 * * * *"
ExternalTaskSensor设置为 execution_delta=timedelta(minutes=30)
我预期的任务流程是:首先运行提取 dag。然后,在完成虚拟任务后,将finish_tranform_table_user触发传感器并运行转换中的任务。
第一次运行良好,但是当我等待第二次运行时,任务会自行中断。我观察到提取 dag被transform_table_user更改某些列名称的任务中断。extract_table_user_data由于列名不再匹配,这将导致失败。
编辑:更确切地说是什么预期和发生的结果
首先,简要介绍每个任务应该执行的操作:
(sql)extract_table_user_schema:从用户表中复制架构
(sql)extract_table_user_data:将数据从源表用户复制到该表的副本中
(虚拟)finish_extract_table_user:虚拟任务
(传感器)sensor_wait_for_finish_extract_table_user:传感器等待提取完成
(sql)transform_table_user:将列字段从重命名id为user_id
(虚拟)finish_transform_table_user:虚拟任务
现在,第一次运行dag,结果是复制的架构+数据和重命名的字段。所有任务均成功完成第二次运行dag时,一项任务失败,而另一项任务未运行。对方都成功。任务extract_table_user_data失败,并显示错误消息;没有这样的领域id。任务finish_extract_table_user未运行
结果表是复制的模式,但没有数据和重命名的字段。
因此,我假设任务transform_table_user是在复制架构后立即运行的。也许布景execution_delta=timedelta(minutes=30)错了吗?也许我误会了传感器的api文档?
帮助将不胜感激!
| 归档时间: |
|
| 查看次数: |
413 次 |
| 最近记录: |