使用ExternalTask​​Sensor的气流计划间隔

Loc*_*cou 5 python directed-acyclic-graphs airflow

我正在尝试以“每30分钟”的间隔设置两个dag的气流。

第一个dag(提取)如下所示:

extract_table_user_schema >>提取表用户数据>> finish_extract_table_user

提取dag

第二个dag(transform)看起来像这样:

sensor_wait_for_finish_extract_table_user >>转换表用户>> finish_transform_table_user

转换dag

对于两个dag,我都将schedule_interval设置为 "*/30 * * * *"

ExternalTask​​Sensor设置为 execution_delta=timedelta(minutes=30)

我预期的任务流程是:首先运行提取 dag。然后,在完成虚拟任务后,将finish_tranform_table_user触发传感器并运行转换中的任务。

第一次运行良好,但是当我等待第二次运行时,任务会自行中断。我观察到提取 dag被transform_table_user更改某些列名称的任务中断。extract_table_user_data由于列名不再匹配,这将导致失败。

编辑:更确切地说是什么预期和发生的结果

首先,简要介绍每个任务应该执行的操作:

(sql)extract_table_user_schema:从用户表中复制架构

(sql)extract_table_user_data:将数据从源表用户复制到该表的副本中

(虚拟)finish_extract_table_user:虚拟任务

(传感器)sensor_wait_for_finish_extract_table_user:传感器等待提取完成

(sql)transform_table_user:将列字段从重命名iduser_id

(虚拟)finish_transform_table_user:虚拟任务

现在,第一次运行dag,结果是复制的架构+数据和重命名的字段。所有任务均成功完成第二次运行dag时,一项任务失败,而另一项任务未运行。对方都成功。任务extract_table_user_data失败,并显示错误消息;没有这样的领域id。任务finish_extract_table_user未运行

结果表是复制的模式,但没有数据重命名的字段。

因此,我假设任务transform_table_user是在复制架构后立即运行的。也许布景execution_delta=timedelta(minutes=30)错了吗?也许我误会了传感器的api文档?

帮助将不胜感激!