如何在虚拟环境中运行Airflow PythonOperator

khu*_*834 10 python virtualenv airflow

我有几个python文件,我目前正在使用BashOperator执行.这使我可以灵活地轻松选择python虚拟环境.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
   'owner': 'airflow',
    'depends_on_past': False,
    ...}

dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")

t1 = BashOperator(
                 task_id='task1',
                bash_command='~/anaconda3/envs/myenv/bin/python 
                              /python_files/python_task1.py',
                 dag=dag)
Run Code Online (Sandbox Code Playgroud)

如何使用PythonOperator实现相同的目标呢?

from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1

python_task = PythonOperator(
              task_id='python_task',
              python_callable=python_task1.main,
             dag=dag)
Run Code Online (Sandbox Code Playgroud)

我假设PythonOperator将使用系统python环境.我发现Airflow有PythonVirtualenvOperator,但这似乎可以通过使用指定的要求动态创建新的虚拟环境来实现.我更喜欢使用已经正确配置的现有服务器.如何使用指定的python路径运行PythonOperator?

vil*_*asv 10

首先要做的事情是:您(通常)不应该为您的 Operator 依赖预先存在的资源。您的运营商应该是可移植的,因此使用长期存在的 virtualenvs 有点违反该原则。话虽如此,这没什么大不了的,就像您必须将软件包预安装到全局环境中一样,您可以预先烘焙一些环境。或者,您可以让 Operator 创建环境,随后的 Operator 可以重用它——我相信这是最简单也是最危险的方法。

实现“virtualenv 缓存”应该不难。读取PythonVirtualenvOperator的执行方法的实现:

def execute_callable(self):
    with TemporaryDirectory(prefix='venv') as tmp_dir:
        ...
        self._execute_in_subprocess(
            self._generate_python_cmd(tmp_dir,
                                      script_filename,
                                      input_filename,
                                      output_filename,
                                      string_args_filename))
        return self._read_result(output_filename)
Run Code Online (Sandbox Code Playgroud)

所以看起来它没有明确删除 virtualenv (它依赖于TemporaryDirectory这样做)。您可以子类化PythonVirtualenvOperator并简单地使用您自己的重用临时目录的上下文管理器:

import glob

@contextmanager
def ReusableTemporaryDirectory(prefix):
    try:
        existing = glob.glob('/tmp/' + prefix + '*')
        if len(existing):
            name = existing[0]
        else:
            name = mkdtemp(prefix=prefix)
        yield name
    finally:
        # simply don't delete the tmp dir
        pass

def execute_callable(self):
    with ReusableTemporaryDirectory(prefix='cached-venv') as tmp_dir:
        ...
Run Code Online (Sandbox Code Playgroud)

自然,你可以去掉try-finallyinReusableTemporaryDirectory并放回通常的suffixdir论点,我做了最小的改变,以便与原始TemporaryDirectory类进行比较。

这样,您的 virtualenv 不会被丢弃,但更新的依赖项最终将由 Operator 安装。


Dus*_*Sun 7

我的解决方法是使用 Bash Operator 调用 /path/to/project/venv/bin/python my.py