我最近一直在使用Airflow,并且找到一个非常常见的模式是循环一些集合以创建多个任务.非常类似于github中示例dags文件夹中的example_python_operator.py dag.
我的问题与动态构建循环迭代的集合有关.假设您要为存储在数据库中的每个未知客户端创建一个任务,并且您计划将它们作为填充列表的方法进行查询.像这样的东西:
first_task = PythonOperator(
task_id='some_upstream_task',
provide_context=True,
python_callable=some_upstream_task,
dag=dag)
clients = my_database_query()
for client in clients:
task = PythonOperator(
task_id='client_' + str(client),
python_callable=some_function,
dag=dag)
task.set_upstream(first_task)
Run Code Online (Sandbox Code Playgroud)
从我所看到的情况来看,这意味着即使您的dag每周只运行一次,您的数据库也会每30秒为这些客户端进行一次轮询.即使您从迭代器设置了一个上游运算符并通过xcoms返回客户端,并且每隔30秒替换my_database_query()一次xcom_pull()仍然轮询的xcoms.这对我来说似乎很浪费,所以我想知道这种类型的dag是否有更好的模式?
在您的代码示例中,我们没有看到 DAG 的计划间隔,但我假设您已经安排了它@daily,并且您希望数据库查询每天运行一次。
在 Airflow 中,调度程序定期解析 DAG(因此称为“每 30 秒”)。所以你的 python 代码会引起问题。
在你的情况下,我会考虑改变视角:为什么不尝试在 PosgresOperator链接中运行数据库查询,然后将该部分作为 DAG 的一部分?根据该 Operator 的输出(例如,您可以通过 XCOM 或通过对象存储中的文件传播),您可以拥有一个下游的 PythonOperator,它不会为一个客户端运行函数,而是为所有客户端运行函数。