p.m*_*aes 17 python celery celery-task airflow
我试图使用Airflow来执行一个简单的任务python.
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如果我尝试,例如:
气流测试python_test打印2015-01-01
有用!
现在我想把我的def print_context(ds, **kwargs)函数放在其他python文件中.所以我创建了名为:simple_test.py的antoher文件并更改:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在我尝试再次运行:
气流测试python_test打印2015-01-01
好的!它仍然有效!
但是,如果我创建一个模块,例如,带有文件的worker模块SimplePython.py,import(from worker import SimplePython)它并尝试:
气流测试python_test打印2015-01-01
它给出了这样的信息:
ImportError:没有名为worker的模块
问题:
ImD*_*enG 11
您可以按照以下方式打包DAG的依赖项:
https://airflow.apache.org/concepts.html#packaged-dags
为此,您可以创建一个zip文件,其中包含zip文件根目录中的dag,并在目录中解压缩额外的模块.例如,您可以创建一个如下所示的zip文件:
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
Run Code Online (Sandbox Code Playgroud)
Airflow将扫描zip文件并尝试加载my_dag1.py和my_dag2.py.它不会进入子目录,因为它们被认为是潜在的包.
使用CeleryExecutor时,您需要手动同步DAG目录,Airflow不会为您处理:
工作人员需要访问其DAGS_FOLDER,您需要通过自己的方式同步文件系统
我见过的唯一受支持的解决方案是将文档中的dags打包为zip文件,但您也可以导入dags文件夹中的模块。如果您使用其他工具(例如puppet&git)自动同步dags文件夹,则此功能很有用。
从问题上我不清楚您的目录结构,因此这是一个基于典型python项目结构的示例dags文件夹:
??? airflow/dags # root airflow dags folder where all dags live
??? my_dags # git repo project root
??? my_dags # python src root (usually named same as project)
? ??? my_test_globals.py # file I want to import
? ??? dag_in_package.py
? ??? dags
? ??? dag_in_subpackage.py
??? README.md # also setup.py, LICENSE, etc here
??? dag_in_project_root.py
Run Code Online (Sandbox Code Playgroud)
我遗漏了(必需的[ 1 ])__init__.py文件。请注意三个示例dag的位置。您几乎肯定会在所有dag中仅使用这些位置之一。为了示例起见,我将它们全部包括在这里,因为导入无关紧要。要从my_test_globals其中任何一个导入:
from my_dags.my_dags import my_test_globals
Run Code Online (Sandbox Code Playgroud)
我相信这意味着气流会在python路径设置为dags目录的情况下运行,因此dags文件夹的每个子目录都可以视为python软件包。在我的情况下,这是附加的中间项目根目录,妨碍了进行典型的包内绝对导入。因此,我们可以像下面这样重组气流项目:
??? airflow/dags # root airflow dags folder where all dags live
??? my_dags # git repo project root & python src root
??? my_test_globals.py # file I want to import
??? dag_in_package.py
??? dags
? ??? dag_in_subpackage.py
??? README.md # also setup.py, LICENSE, etc here
??? dag_in_project_root.py
Run Code Online (Sandbox Code Playgroud)
因此,进口看上去就像我们期望的那样:
from my_dags import my_test_globals
Run Code Online (Sandbox Code Playgroud)
对于你的第一个问题,这是有可能的。
我想你应该__init__.py在同一目录下创建一个名为SimplePython.py(在你的情况下是worker目录)的空文件。通过这样做,该worker目录将被视为 python 模块。
然后在您的 DAG 定义中,尝试from worker.SimplePython import print_context.
在你的情况下,我想如果你为airflow编写一个插件会更好,因为你可能想升级airflow核心项目而不删除你的自定义功能。
| 归档时间: |
|
| 查看次数: |
17299 次 |
| 最近记录: |