如何将 SQL 作为带参数的文件传递给 Airflow Operator

Pro*_*120 10 airflow

我在气流中有一个操作员:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 
Run Code Online (Sandbox Code Playgroud)

现在,我需要运行的实际查询有 24 行。我想将它保存在一个文件中,并为操作员提供 SQL 文件的路径。操作员支持这一点,但我不确定如何处理需要 SQL 的参数。

建议?

编辑:这是我的代码:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 
Run Code Online (Sandbox Code Playgroud)

这给出:

jinja2.exceptions.UndefinedError: 'templates_dict' 未定义

Men*_*hak 14

正如您所注意到的,MySqlToGoogleCloudStorageOperator指定了一个template_ext.sql 扩展名。

首先在您的 中Dag,指定放置 .sql 文件的路径

dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])
Run Code Online (Sandbox Code Playgroud)

在 yourfile.sql 中放置您的大型查询。请注意params.ord_id

SELECT * FROM orders where orderid> {{ params.ord_id }}
Run Code Online (Sandbox Code Playgroud)

现在在sql运算符的参数中,传递文件的名称。

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql='yourfile.sql',
    params={"ord_id":99},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 
Run Code Online (Sandbox Code Playgroud)

重要的是不要在该文件名后放置空格。这是因为 Jinja 模板引擎将查找以 结尾的字符串.sql,如果找到,它会将其视为文件而不是字符串。