zac*_*ack 11 google-bigquery airflow
最近更新了我的 Airflow,发现BigQueryExecuteQueryOperator
已被弃用,取而代之的是BigQueryInsertJobOperator
. 该文档似乎相当模糊,链接到REST 资源:jobs(和方法:jobs.query)。特别是,我不清楚是否有任何地方可以指定write_disposition
、destination_dataset_table
等。我想确保我不会让事情变得过于复杂。
我目前在哪里做
# my.sql
SELECT * FROM `proj.ds.table_1`
---------------------------------
# my-dag.py
BigQueryExecuteQueryOperator(
task_id='copy-table-1',
sql = 'my.sql',
destination_dataset_table='proj:ds.table_2',
write_disposition='WRITE_EMPTY',
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
我现在需要使用DDL 语句吗
# my.sql
CREATE TABLE IF NOT EXISTS
ds.table_2
AS (
SELECT * FROM `proj.ds.table_1`
)
---------------------------------
# my-dag.py
BigQueryInsertJobOperator(
task_id='copy-table-1',
configuration={
'query': my.sql
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
更不用说以QueryParameter 文档建议的格式将参数传递给查询,而不仅仅是params={ 'table': THE_TABLE }
......
这是要遵循的 API 文档BigQueryInsertJobOperator
:https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery。
我使用此方法来获取要写入 BQ 的模板化查询,而不是搞乱 Google 的参数化 SQL 查询:
# my.sql
SELECT * FROM `{PROJECT}.{DATASET}.{TBL_TO_MOVE}`
---------------------------------
# my-dag.py
PROJECT = 'my-project'
DATASET = 'my-dataset'
TBL_TO_MOVE = 'some-table'
DESTINATION_DS = 'other-dataset'
DESTINATION_TBL = 'other-table'
BigQueryInsertJobOperator(
task_id='copy-table-1',
configuration={
'query': {
'query': open('my.sql', 'r').read().format(**locals()),
'destinationTable': {
'projectId': PROJECT,
'datasetId': DESTINATION_DS,
'tableId': DESTINATION_TBL
},
'useLegacySql': False,
'allowLargeResults': True,
}
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
读取open('my.sql', ...)
SQL 文件,然后使用局部变量代替大括号变量(例如,{PROJECT}
被 替换my-project
)。
归档时间: |
|
查看次数: |
16814 次 |
最近记录: |