动态构建集合以在Airflow dag中循环

mok*_*oku 8 python airflow

我最近一直在使用Airflow,并且找到一个非常常见的模式是循环一些集合以创建多个任务.非常类似于github中示例dags文件夹中的example_python_operator.py dag.

我的问题与动态构建循环迭代的集合有关.假设您要为存储在数据库中的每个未知客户端创建一个任务,并且您计划将它们作为填充列表的方法进行查询.像这样的东西:

first_task = PythonOperator(
    task_id='some_upstream_task',
    provide_context=True,
    python_callable=some_upstream_task,
    dag=dag)

clients = my_database_query()

for client in clients:
    task = PythonOperator(
        task_id='client_' + str(client),
        python_callable=some_function,
        dag=dag)

    task.set_upstream(first_task)
Run Code Online (Sandbox Code Playgroud)

从我所看到的情况来看,这意味着即使您的dag每周只运行一次,您的数据库也会每30秒为这些客户端进行一次轮询.即使您从迭代器设置了一个上游运算符并通过xcoms返回客户端,并且每隔30秒替换my_database_query()一次xcom_pull()仍然轮询的xcoms.这对我来说似乎很浪费,所以我想知道这种类型的dag是否有更好的模式?

lou*_*ton 1

在您的代码示例中,我们没有看到 DAG 的计划间隔,但我假设您已经安排了它@daily,并且您希望数据库查询每天运行一次。

在 Airflow 中,调度程序定期解析 DAG(因此称为“每 30 秒”)。所以你的 python 代码会引起问题。

在你的情况下,我会考虑改变视角:为什么不尝试在 PosgresOperator链接中运行数据库查询,然后将该部分作为 DAG 的一部分?根据该 Operator 的输出(例如,您可以通过 XCOM 或通过对象存储中的文件传播),您可以拥有一个下游的 PythonOperator,它不会为一个客户端运行函数,而是为所有客户端运行函数。