使用 BigQueryInsertJobOperator 而不是 BigQueryExecuteQueryOperator

zac*_*ack 11 google-bigquery airflow

最近更新了我的 Airflow,发现BigQueryExecuteQueryOperator已被弃用,取而代之的是BigQueryInsertJobOperator. 该文档似乎相当模糊,链接到REST 资源:jobs(和方法:jobs.query)。特别是,我不清楚是否有任何地方可以指定write_dispositiondestination_dataset_table等。我想确保我不会让事情变得过于复杂。

我目前在哪里做

# my.sql
SELECT * FROM `proj.ds.table_1`
---------------------------------
# my-dag.py
BigQueryExecuteQueryOperator(
    task_id='copy-table-1',
    sql = 'my.sql',
    destination_dataset_table='proj:ds.table_2',
    write_disposition='WRITE_EMPTY',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

我现在需要使用DDL 语句

# my.sql
CREATE TABLE IF NOT EXISTS
ds.table_2
AS (
  SELECT * FROM `proj.ds.table_1`
)
---------------------------------
# my-dag.py
BigQueryInsertJobOperator(
    task_id='copy-table-1',
    configuration={
        'query': my.sql
    },
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

更不用说以QueryParameter 文档建议的格式将参数传递给查询,而不仅仅是params={ 'table': THE_TABLE }......

zac*_*ack 8

这是要遵循的 API 文档BigQueryInsertJobOperatorhttps://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery

我使用此方法来获取要写入 BQ 的模板化查询,而不是搞乱 Google 的参数化 SQL 查询:

# my.sql
SELECT * FROM `{PROJECT}.{DATASET}.{TBL_TO_MOVE}`
---------------------------------
# my-dag.py
PROJECT = 'my-project'
DATASET = 'my-dataset'
TBL_TO_MOVE = 'some-table'

DESTINATION_DS = 'other-dataset'
DESTINATION_TBL = 'other-table'

BigQueryInsertJobOperator(
    task_id='copy-table-1',
    configuration={
        'query': {
            'query': open('my.sql', 'r').read().format(**locals()),
            'destinationTable': {
                'projectId': PROJECT,
                'datasetId': DESTINATION_DS,
                'tableId': DESTINATION_TBL
            },
            'useLegacySql': False,
            'allowLargeResults': True,
        }
    },
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

读取open('my.sql', ...)SQL 文件,然后使用局部变量代替大括号变量(例如,{PROJECT}被 替换my-project)。