Airflow DAG中的外部文件

Ale*_*ani 11 python airflow

我正在尝试访问Airflow任务中的外部文件来读取一些sql,我得到"找不到文件".有人遇到过这个吗?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)
Run Code Online (Sandbox Code Playgroud)

日志状态如下:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
Run Code Online (Sandbox Code Playgroud)

我知道我可以简单地将查询复制并粘贴到同一个文件中,但实际上并不是很简洁.有多个查询,文本真的很大,嵌入Python代码会损害可读性.

zho*_*jie 9

这是一个使用Variable的例子,使其变得简单.

  • 首先在- > - >中添加变量,例如.`{key:'sql_path',值:'your_sql_script_folder'}Airflow UIAdminVariable

  • 然后在DAG中添加以下代码,以使用刚刚添加的Airflow变量.

DAG代码

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
Run Code Online (Sandbox Code Playgroud)
  • 现在您可以在文件夹Variable下使用sql脚本名称或路径

  • 你可以在这里学到更多

  • 我认为这不适用于 OP 与“PythonOperator”和 Python 原生“open()”一起使用的示例 DAG。PythonOperator 在 pod 中运行,该 pod 无法访问与解析 DAG 的进程相同的一组位置。 (3认同)
  • @RicardoMS嗨,当你想定义自己的`airflow.models.Variable`时,最简单的方法是通过Airflow UI,`homepage -> Admin -> Variables`来创建新变量,例如:`{'Key': ' RicardoMS_variable', 'Val': '/opt/specific/path'}`。完成后,您可以使用示例代码通过 `tmpl_search_path = Variable.get("RicardoMS_variable")` 加载变量,而不是直接使用 `'/opt/special/path'` (2认同)

Jak*_*ake 9

假设该sql目录是相对于当前Python文件的,你可以像这样算出sql文件的绝对路径:

import os

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def run_query():
    # read the query
    query = open(f"{CUR_DIR}/sql/queryfile.sql")
    # run the query
    execute(query)
Run Code Online (Sandbox Code Playgroud)


Pri*_*hta 6

所有相对路径均参考AIRFLOW_HOME环境变量。尝试:

  • 给出绝对路径
  • 将文件相对于AIRFLOW_HOME
  • 尝试在 python callable 中记录PWD,然后决定提供什么路径(最佳选项)

  • 很好的评论,但不幸的是 AIRFLOW_HOME 是一个可选的环境变量 - 没有它 Airflow 工作得很好 - 你不能保证它会被设置。 (2认同)