相关疑难解决方法(0)

Airflow DAG中的外部文件

我正在尝试访问Airflow任务中的外部文件来读取一些sql,我得到"找不到文件".有人遇到过这个吗?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)
Run Code Online (Sandbox Code Playgroud)

日志状态如下:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
Run Code Online (Sandbox Code Playgroud)

我知道我可以简单地将查询复制并粘贴到同一个文件中,但实际上并不是很简洁.有多个查询,文本真的很大,嵌入Python代码会损害可读性.

python airflow

11
推荐指数
3
解决办法
8225
查看次数

如何将 SQL 作为带参数的文件传递给 Airflow Operator

我在气流中有一个操作员:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 
Run Code Online (Sandbox Code Playgroud)

现在,我需要运行的实际查询有 24 行。我想将它保存在一个文件中,并为操作员提供 SQL 文件的路径。操作员支持这一点,但我不确定如何处理需要 SQL 的参数。

建议?

编辑:这是我的代码:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 
Run Code Online (Sandbox Code Playgroud)

这给出:

jinja2.exceptions.UndefinedError: 'templates_dict' 未定义

airflow

10
推荐指数
1
解决办法
9978
查看次数

标签 统计

airflow ×2

python ×1