如何测试使用XCom的Apache Airflow任务

Bru*_*ria 5 python apache testing airflow

我试图找出一种测试DAG的方法,其中有两个使用XCom进行通信的任务。

由于控制台命令仅允许我从DAG运行任务,是否有一种方法可以测试通信而不必通过UI运行DAG?

谢谢

Bru*_*ria 10

这是一种对我有用的方法。

尽管 Airflow 网页声明测试命令不会生成或保持任何状态,但按顺序运行气流测试命令仍然有效。

基本上你这样做:

Airflow test my_dag task1 date
Airflow test my_dag task2 date
Run Code Online (Sandbox Code Playgroud)

其中 task1 返回值或使用 xcom_push 方法发送值,然后 task2 运行 xcom_pull 以获取值并继续。

它对我有用。如果您有不同的方法或其他问题,请发表评论。

  • 为了进一步解释为什么这种方法有效,airflow 使用 pickle 序列化 xcom 中的值并将它们存储在数据库中。当你在没有 `--dry_run` 标志的情况下运行 `airflow test` 时,它仍然会将程序运行的一些信息保存到数据库中,包括 `task1` 的 xcom。如果您希望数据库不包含大量测试运行,这将是有问题的。对我们来说不幸的是,在撰写本文时,气流并没有真正模拟 xcom 的好方法,因此这(或重写您的运算符以在测试期间不使用 xcom)是您的选择。 (2认同)