Airflow - 如何在 for 循环的迭代之间设置任务依赖关系?

enm*_*myj 3 python etl airflow

我正在使用 Airflow 在 for 循环中运行一组任务。循环的目的是遍历数据库表名列表并执行以下操作:

for table_name in list_of_tables:
    if table exists in database (BranchPythonOperator)
        do nothing (DummyOperator)
    else:
        create table (JdbcOperator)
    insert records into table (JdbcOperator, Trigger on One Success)
Run Code Online (Sandbox Code Playgroud)

在 Web UI 上,这看起来像:

for循环中的任务

目前,Airflow 是从上到下然后从左到右执行此图中的任务,例如:tbl_exists_fake_table_one--> tbl_exists_fake_table_two-->tbl_create_fake_table_one等。

但是,insert声明 forfake_table_two取决于fake_table_one正在更新,Airflow 当前未捕获该依赖项。(从技术上讲,这种依赖关系是由 的顺序捕获的list_of_table_names,但我相信在更复杂的情况下这会容易出错)

我希望与fake_table_one运行相关的所有任务,然后是与fake_table_two. 如何在 Airflow 中完成此操作?

完整代码如下:

for tbl_name in list_of_table_names:

    # Check if table exists by querying information tables
    def has_table(tbl_name=tbl_name):
        p = JdbcHook('conn_id')
        sql =""" select count(*) from system.tables where name = '{}' """.format(tbl_name.upper())
        count = p.get_records(sql)[0][0] #unpack the list/tuple

        # If the query didn't return rows, branch to Create Table Task
        # otherwise, branch to Dummy Operator (Airflow requires that both branches have a task)
        if count == 0:
            return 'tbl_create_{}'.format(tbl_name)
        else:
            return 'dummy_{}'.format(tbl_name) 

    # run has_table python function
    exists = BranchPythonOperator(
        task_id='tbl_exists_{}'.format(tbl_name),
        python_callable=has_table,
        depends_on_past=False,
        dag=dag
    )

    # Dummy Operator
    dummy = DummyOperator(task_id='dummy_{}'.format(tbl_name),dag=dag,depends_on_past=False)

    # Run create table SQL script
    create = JdbcOperator(
        task_id='tbl_create_{}'.format(tbl_name),
        jdbc_conn_id='conn_id',
        sql = sql_parse(script_path, 'sql/sql_create/{}.sql'.format(tbl_name)), 
        depends_on_past=False,
        dag = dag
    )

    # Run insert or truncate/replace SQL script
    upsert = JdbcOperator(
        task_id='tbl_upsert_{}'.format(tbl_name),
        jdbc_conn_id='conn_id',
        sql = sql_parse(script_path, 'sql/sql_upsert/{}.sql'.format(tbl_name)),
        trigger_rule=TriggerRule.ONE_SUCCESS,
        dag = dag
    )

    # Set dependencies
    exists >> create >> upsert 
    exists >> dummy >> upsert
Run Code Online (Sandbox Code Playgroud)

jhn*_*lvr 6

存储对在每个循环结束时添加的最后一个任务的引用。然后,在每个循环开始时,检查 ref 是否存在。如果引用存在,则将其设置为上游。

像这样的东西:

last_task = None

for tbl_name in list_of_table_names:


    # run has_table python function
    exists = BranchPythonOperator(
        task_id='tbl_exists_{}'.format(tbl_name),
        python_callable=has_table,
        depends_on_past=False,
        dag=dag
    )

    if last_task:
        last_task >> exists


    ...


    # Run insert or truncate/replace SQL script
    upsert = JdbcOperator(
        task_id='tbl_upsert_{}'.format(tbl_name),
        jdbc_conn_id='conn_id',
        sql = sql_parse(script_path, 'sql/sql_upsert/{}.sql'.format(tbl_name)),
        trigger_rule=TriggerRule.ONE_SUCCESS,
        dag = dag
    )

    last_task = upsert

    ...
Run Code Online (Sandbox Code Playgroud)