如何在气流 dags 之间管理 python 包?

lam*_*ter 6 airflow

如果我有多个具有重叠 python 包依赖项的气流 dag,我该如何保留这些项目中的每一个。解耦?例如。如果我在同一台服务器上有项目 A 和 B,我会用类似的东西运行它们中的每一个......

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
Run Code Online (Sandbox Code Playgroud)

基本上,想在相同的情况下运行 dag(例如,每个 dag 使用 python 脚本,这些脚本可能具有重叠的包 deps。我想单独开发(即,在想要更新时不必使用包更新所有代码)该软件包仅用于一个项目))。请注意,我一直在使用BashOperator来运行 python 任务,例如...

do_stuff = BashOperator(
        task_id='my_task',
        bash_command='python /path/to/script.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

有没有办法让这个工作?气流是否有其他最佳实践方式供人们解决(或避免)这些问题?

lam*_*ter 5

基于来自 apache-airflow 邮件列表的讨论,解决我使用各种 python 脚本执行任务的模块化方式的最简单的答案是直接为每个脚本或模块调用 virtualenv python 解释器二进制文件,例如。

source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
Run Code Online (Sandbox Code Playgroud)

会翻译成类似的东西

do_stuff_a = BashOperator(
        task_id='my_task_a',
        bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
do_stuff_b = BashOperator(
        task_id='my_task_b',
        bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
        execution_timeout=timedelta(minutes=30),
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

在气流中。


对于将 args 传递给 Tasks 的问题,这取决于您要传入的 args 的性质。在我的情况下,某些 args 取决于运行 dag 当天数据表的样子(例如. 表中最高时间戳记录等)。要将这些参数添加到任务中,我有一个在此之前运行的“congif dag”。在配置 dag 中,有一个 Task 可以将“真实”dag 的 args 生成为 python dict 并转换为 pickle 文件。然后“配置”dag 有一个任务,它是一个TriggerDagRunOperator激活“真实”dag的任务,它具有从“配置”dag 生成的泡菜文件中读取的初始逻辑(在我的情况下,作为 a Dict),我将其读入bash_command字符串之类的bash_command=f"python script.py {configs['arg1']}"

  • 在这种情况下你如何通过论证?PythonOperator 中有类似 op_args / op_kwargs 的东西吗? (3认同)

小智 4

您可以使用打包的 dags,其中每个 dag 与其依赖项一起打包 http://airflow.apache.org/concepts.html#packaged-dags