Airflow中的动态任务定义

Daw*_*wid 9 python bash orchestration airflow

我目前正在尝试使用Airflow编排一个过程,其中一些运算符是动态定义的,并依赖于另一个(早期)运算符的输出.

在下面的代码中,t1更新了一个带有新记录的文本文件(这些记录实际上是从外部队列中读取的,但为简单起见,我在这里将它们硬编码为A,B和C).然后,我想为从该文本文件读取的每个记录创建单独的运算符.这些运算符将分别创建目录A,B和C,并且在Airflow UI中将被视为单独的bash进程Create_directory_A,Create_directory_B和Create_directory_C.

dag = DAG('Test_DAG',
          description="Lorem ipsum.",
          start_date=datetime(2017, 3, 20),
          schedule_interval=None,
          catchup=False)


def create_text_file(list_of_rows):
    text_file = open('text_file.txt', "w")
    for row in list_of_rows:
        text_file.write(row + '\n')
    text_file.close()


def read_text():
    txt_file = open('text_file.txt', 'r')
    return [element for element in txt_file.readlines()]


t1 = PythonOperator(
    task_id='Create_text_file',
    python_callable=create_text_file,
    op_args=[['A', 'B', 'C']],
    dag=dag
)

for row in read_text():
    t2 = BashOperator(
        task_id='Create_directory_{}'.format(row),
        bash_command="mkdir {{params.dir_name}}",
        params={'dir_name': row},
        dag=dag
    )

    t1 >> t2
Run Code Online (Sandbox Code Playgroud)

Airflow的文档中,我可以看到调度程序将定期执行它[DAG]以反映更改(如果有的话).这是否意味着存在这样的风险:即使我的t1运算符在t2之前执行,也会在更新之前为记录列表创建bash运算符(就像评估DAG时那样)?

Eri*_*and 12

您无法动态创建依赖于上游任务输出的任务.你正在混合计划和执行时间.在计划时创建DAG 定义任务.在执行时创建DAG 运行任务实例.只有任务实例才能生成输出.

Airflow调度程序将使用计划时text_file.txt包含的任何内容构建动态图.然后将这些任务运送给工人.

工作人员最终将执行t1任务实例并创建新任务text_file.txt,但此时,t2任务列表已由调度程序计算并发送给工作人员.

因此,下次调度程序决定运行DAG时,将使用最新的t1任务实例转储text_file.txt.

如果您的任务很快并且您的工作人员没有积压,那么这将是之前DAG运行的内容.如果它们被积压,text_file.txt内容可能是陈旧的,如果你真的不走运,调度程序在任务实例写入文件时读取文件,你将得到不完整的数据read_text().