气流 - Python文件不在同一个DAG文件夹中

p.m*_*aes 17 python celery celery-task airflow

我试图使用Airflow来执行一个简单的任务python.

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

如果我尝试,例如:

气流测试python_test打印2015-01-01

有用!

现在我想把我的def print_context(ds, **kwargs)函数放在其他python文件中.所以我创建了名为:simple_test.py的antoher文件并更改:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

现在我尝试再次运行:

气流测试python_test打印2015-01-01

好的!它仍然有效!

但是,如果我创建一个模块,例如,带有文件的worker模块SimplePython.py,import(from worker import SimplePython)它并尝试:

气流测试python_test打印2015-01-01

它给出了这样的信息:

ImportError:没有名为worker的模块

问题:

  1. 是否可以在DAG定义中导入模块?
  2. Airflow + Celery如何在工作节点上分发所有必需的python源文件?

ImD*_*enG 11

您可以按照以下方式打包DAG的依赖项:

https://airflow.apache.org/concepts.html#packaged-dags

为此,您可以创建一个zip文件,其中包含zip文件根目录中的dag,并在目录中解压缩额外的模块.例如,您可以创建一个如下所示的zip文件:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
Run Code Online (Sandbox Code Playgroud)

Airflow将扫描zip文件并尝试加载my_dag1.py和my_dag2.py.它不会进入子目录,因为它们被认为是潜在的包.

使用CeleryExecutor时,您需要手动同步DAG目录,Airflow不会为您处理:

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

工作人员需要访问其DAGS_FOLDER,您需要通过自己的方式同步文件系统


7yl*_*l4r 6

我见过的唯一受支持的解决方案是将文档中的dags打包为zip文件,但您也可以导入dags文件夹中的模块。如果您使用其他工具(例如puppet&git)自动同步dags文件夹,则此功能很有用。

从问题上我不清楚您的目录结构,因此这是一个基于典型python项目结构的示例dags文件夹:

??? airflow/dags  # root airflow dags folder where all dags live
    ??? my_dags  # git repo project root
        ??? my_dags  # python src root (usually named same as project)
        ?   ??? my_test_globals.py  # file I want to import
        ?   ??? dag_in_package.py 
        ?   ??? dags 
        ?        ??? dag_in_subpackage.py
        ??? README.md  # also setup.py, LICENSE, etc here
        ??? dag_in_project_root.py
Run Code Online (Sandbox Code Playgroud)

我遗漏了(必需的[ 1 ])__init__.py文件。请注意三个示例dag的位置。您几乎肯定会在所有dag中仅使用这些位置之一。为了示例起见,我将它们全部包括在这里,因为导入无关紧要。要从my_test_globals其中任何一个导入:

from my_dags.my_dags import my_test_globals
Run Code Online (Sandbox Code Playgroud)

我相信这意味着气流会在python路径设置为dags目录的情况下运行,因此dags文件夹的每个子目录都可以视为python软件包。在我的情况下,这是附加的中间项目根目录,妨碍了进行典型的包内绝对导入。因此,我们可以像下面这样重组气流项目:

??? airflow/dags  # root airflow dags folder where all dags live
    ??? my_dags  # git repo project root & python src root
        ??? my_test_globals.py  # file I want to import
        ??? dag_in_package.py 
        ??? dags 
        ?    ??? dag_in_subpackage.py
        ??? README.md  # also setup.py, LICENSE, etc here
        ??? dag_in_project_root.py
Run Code Online (Sandbox Code Playgroud)

因此,进口看上去就像我们期望的那样:

from my_dags import my_test_globals
Run Code Online (Sandbox Code Playgroud)


Yon*_*yiw 1

对于你的第一个问题,这是有可能的。

我想你应该__init__.py在同一目录下创建一个名为SimplePython.py(在你的情况下是worker目录)的空文件。通过这样做,该worker目录将被视为 python 模块。

然后在您的 DAG 定义中,尝试from worker.SimplePython import print_context.

在你的情况下,我想如果你为airflow编写一个插件会更好,因为你可能想升级airflow核心项目而不删除你的自定义功能。