我正在使用virtualenv.我正在尝试在DAG文件夹中使用包.airflow_home目录的当前状态是:
airflow_home/airflow.cfg
airflow_home/airflow.db
airflow_home/dags/__init__.py
airflow_home/dags/hello_world.py
airflow_home/dags/support/inner.py
airflow_home/dags/support/__init__.py
Run Code Online (Sandbox Code Playgroud)
hello_world.py有代码:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from dags.support import inner
def print_hello():
return 'Hello world'
dag = DAG('hello_world', description='simple tutorial DAG',
schedule_interval='0 12 * * *', start_date=datetime(2017, 8, 20), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
hello_from_inner_operator = PythonOperator(task_id='hello_from_inner', python_callable=inner.hello_from_inner, dag=dag)
dummy_operator >> hello_operator
hello_operator >> hello_from_inner_operator
Run Code Online (Sandbox Code Playgroud)
如果我可以手动运行此脚本,它将运行.但后来我开始气流调度,
Broken DAG: No module named 'dags'
Run Code Online (Sandbox Code Playgroud)
出现错误.我做错了什么,解决这个问题的方法是什么?
使用from support import inner来代替.
当气流开始时,路径$AIRFLOW_HOME/dags将被添加到路径中sys.path.所以它将搜索dags目录下的模块.