将数据从Google Storage加载到BigQuery时如何执行UPSERT?

Pro*_*120 2 google-bigquery

BigQuery支持以下策略:

WRITE_APPEND -指定可以将行追加到现有表中。

WRITE_EMPTY -指定输出表必须为空。

WRITE_TRUNCATE -指定写应替换表。

它们都不适合UPSERT操作目的。

我正在将订单Json文件导入Google Storage,并希望将其加载到BigQuery中。逻辑提示,某些记录将是新记录,而其他记录已从以前的装载中获取并且需要更新(例如,更新订单状态(新/处于保留状态/已发送/退款等...)

我正在使用Airflow,但我的问题很普遍:

update_bigquery = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_orders_to_BigQuery',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)
Run Code Online (Sandbox Code Playgroud)

此代码使用表示WRITE_TRUNCATE这意味着删除整个表并加载请求的文件。

我如何修改它以提供支持UPSERT

我唯一的选择是查询表搜索以找到json中出现的现有订单LOAD吗?删除它们,然后执行?

Fel*_*ffa 5

除了运行之外GoogleCloudStorageToBigQueryOperator,您还可以运行一个查询,该查询将为您提供与upsert相同的结果。

来自https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement的示例:

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
  UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
  INSERT (product, quantity) VALUES(product, quantity)
Run Code Online (Sandbox Code Playgroud)

该查询将:

  • 看一下表T(当前)和S(更新)。
  • 如果更新更改了现有行,它将UPDATE在该行上运行。
  • 如果更新的产品尚不存在,它将更新INSERT该行。

现在,BigQuery将如何知道您的表S?您可以: