BigQuery - 更新架构中的 LoadJobConfig 与 QueryJobConfig

nxh*_*991 4 python google-bigquery google-cloud-platform

我遇到了如下情况:事实上,我尝试更新分区表的架构(按Time-unit 列分区)。我使用这篇文章这个例子作为我的参考,文档说

schemaUpdateOptions[] :在两种情况下支持架构更新选项:当 writeDisposition 为 WRITE_APPEND 时;当 writeDisposition 为 WRITE_TRUNCATE 并且目标表是由分区装饰器指定的表的分区时。对于普通表,WRITE_TRUNCATE 将始终覆盖架构。

所以我的理解是LoadJobConfig().schema_update_options = [bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION]

  • 对于法线表,LoadJobConfig().write_disposition
写_追加 写截断
成功更新架构并将新数据附加到表中 更新架构成功,但表被新数据覆盖
  • 对于分区表,LoadJobConfig().write_disposition
写_追加 写截断
不允许 - 错误消息:“无效:架构更新选项只能使用 WRITE_APPEND 部署或表分区上的 WRITE_TRUNCATE 部署来指定。” 更新架构成功,但表被新数据覆盖

当我使用时总是如此LoadJobConfig(),但如果我QueryJobConfig()改为使用,情况就会改变。

事实上,对于普通表来说仍然如此,但对于分区表来说,即使在 时write_disposition=WRITE_APPEND,架构仍然会成功更新,并且新数据会附加到表中!

请问这种情况我们该如何解释?有什么特别之处QueryJobConfig()吗?还是我理解错了什么?

非常感谢 !!

小智 5

每个类之间都有一些细微的差异,我建议您注意每个类的默认配置,如果其中任何一个最终返回错误可能是由于配置的初始化不正确而导致的,您就可以解决您的问题它的功能。

查询作业配置

QueryJobConfig(**kwargs)

查询作业的配置选项。

此类中的所有属性都是可选的。值为 : data: None ->服务器默认值。通过使用属性名称作为关键字参数的名称来设置构造的配置的属性。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the destination table.
# table_id = "your-project.your_dataset.your_table_name"

# Retrieves the destination table and checks the length of the schema.
table = client.get_table(table_id)  # Make an API request.
print("Table {} contains {} columns".format(table_id, len(table.schema)))

# Configures the query to append the results to a destination table,
# allowing field addition.
job_config = bigquery.QueryJobConfig(
    destination=table_id,
    schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)

# Start the query, passing in the extra configuration.
query_job = client.query(
    # In this example, the existing table contains only the 'full_name' and
    # 'age' columns, while the results of this query will contain an
    # additional 'favorite_color' column.
    'SELECT "Timmy" as full_name, 85 as age, "Blue" as favorite_color;',
    job_config=job_config,
)  # Make an API request.
query_job.result()  # Wait for the job to complete.

# Checks the updated length of the schema.
table = client.get_table(table_id)  # Make an API request.
print("Table {} now contains {} columns".format(table_id, len(table.schema)))
Run Code Online (Sandbox Code Playgroud)

加载作业配置

LoadJobConfig(**kwargs)

加载作业的配置选项。

通过使用属性名称作为关键字参数的名称来设置构造的配置的属性。未设置的值或 : data:None使用BigQuery REST API默认值。有关默认值列表,请参阅BigQuery REST API 参考文档。

所需选项因 source_format 值而异。例如,BigQuery API'ssource_format 的默认值为"CSV"。加载CSV文件时,必须设置架构或自动检测必须设置为 : data: True

# from google.cloud import bigquery
# client = bigquery.Client()
# project = client.project
# dataset_ref = bigquery.DatasetReference(project, 'my_dataset')
# filepath = 'path/to/your_file.csv'

# Retrieves the destination table and checks the length of the schema
table_id = "my_table"
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
print("Table {} contains {} columns.".format(table_id, len(table.schema)))

# Configures the load job to append the data to the destination table,
# allowing field addition
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
    bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
]
# In this example, the existing table contains only the 'full_name' column.
# 'REQUIRED' fields cannot be added to an existing schema, so the
# additional column must be 'NULLABLE'.
job_config.schema = [
    bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1

with open(filepath, "rb") as source_file:
    job = client.load_table_from_file(
        source_file,
        table_ref,
        location="US",  # Must match the destination dataset location.
        job_config=job_config,
    )  # API request

job.result()  # Waits for table load to complete.
print(
    "Loaded {} rows into {}:{}.".format(
        job.output_rows, dataset_id, table_ref.table_id
    )
)

# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_id, len(table.schema)))
Run Code Online (Sandbox Code Playgroud)

应该注意的是,google.cloud.bigquery.job.SchemaUpdateOption在两个类中都重载,并指定对目标表模式的更新,以允许作为查询作业的副作用。