我正在尝试访问Airflow任务中的外部文件来读取一些sql,我得到"找不到文件".有人遇到过这个吗?
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'my_dat',
start_date=datetime(2017, 1, 1),
catchup=False,
schedule_interval=timedelta(days=1)
)
def run_query():
# read the query
query = open('sql/queryfile.sql')
# run the query
execute(query)
tas = PythonOperator(
task_id='run_query', dag=dag, python_callable=run_query)
Run Code Online (Sandbox Code Playgroud)
日志状态如下:
IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
Run Code Online (Sandbox Code Playgroud)
我知道我可以简单地将查询复制并粘贴到同一个文件中,但实际上并不是很简洁.有多个查询,文本真的很大,嵌入Python代码会损害可读性.
我在气流中有一个操作员:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在,我需要运行的实际查询有 24 行。我想将它保存在一个文件中,并为操作员提供 SQL 文件的路径。操作员支持这一点,但我不确定如何处理需要 SQL 的参数。
建议?
编辑:这是我的代码:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
sql = '{{ templates_dict.sql }}',
params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' : TABLE_NAME},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
这给出:
jinja2.exceptions.UndefinedError: 'templates_dict' 未定义