我正在尝试访问Airflow任务中的外部文件来读取一些sql,我得到"找不到文件".有人遇到过这个吗?
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'my_dat',
start_date=datetime(2017, 1, 1),
catchup=False,
schedule_interval=timedelta(days=1)
)
def run_query():
# read the query
query = open('sql/queryfile.sql')
# run the query
execute(query)
tas = PythonOperator(
task_id='run_query', dag=dag, python_callable=run_query)
Run Code Online (Sandbox Code Playgroud)
日志状态如下:
IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
Run Code Online (Sandbox Code Playgroud)
我知道我可以简单地将查询复制并粘贴到同一个文件中,但实际上并不是很简洁.有多个查询,文本真的很大,嵌入Python代码会损害可读性.
这是一个使用Variable的例子,使其变得简单.
首先在- > - >中添加变量,例如.`{key:'sql_path',值:'your_sql_script_folder'}Airflow UI
Admin
Variable
然后在DAG中添加以下代码,以使用刚刚添加的Airflow变量.
DAG代码
import airflow
from airflow.models import Variable
tmpl_search_path = Variable.get("sql_path")
dag = airflow.DAG(
'tutorial',
schedule_interval="@daily",
template_searchpath=tmpl_search_path, # this
default_args=default_args
)
Run Code Online (Sandbox Code Playgroud)
现在您可以在文件夹Variable下使用sql脚本名称或路径
你可以在这里学到更多
假设该sql
目录是相对于当前Python文件的,你可以像这样算出sql文件的绝对路径:
import os
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
def run_query():
# read the query
query = open(f"{CUR_DIR}/sql/queryfile.sql")
# run the query
execute(query)
Run Code Online (Sandbox Code Playgroud)
所有相对路径均参考AIRFLOW_HOME环境变量。尝试:
归档时间: |
|
查看次数: |
8225 次 |
最近记录: |