如何使用 python 从云函数将数据帧加载到 BigQuery 分区表中

Sim*_*ton 1 python-3.x google-bigquery google-cloud-functions

我有点惊讶我在任何地方都找不到这个答案。

我有以下架构和设置:

job_config = bigquery.LoadJobConfig(schema = [
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("Timestamp", "TIMESTAMP"),
bigquery.SchemaField("bar", "INT64"),
bigquery.SchemaField("id", "STRING")
])

load_job = bq_client.load_table_from_dataframe(
    df, '.'.join([PROJECT, DATASET, TABLE]), job_config = job_config
)

load_job.result()
Run Code Online (Sandbox Code Playgroud)

如您所见,我正在将数据框表 ( df) 加载到 BigQuery。它运行良好。但是,我想将数据帧加载到分区表中,并使用时间戳字段来定义每个分区表的日期。

我该怎么做?

Gui*_*ins 5

您可以修改time_partitioning为您的LoadJobConfig. 可以在此处TimePartitioning找到该类的描述,并在文档中找到类似的示例。用于指定使用哪个字段作为分区标准。在您的情况下,它可能是这样的(添加 90 天过期规则):TimePartitioning.field

job_config = bigquery.LoadJobConfig(
    schema = [
        bigquery.SchemaField("foo", "STRING"),
        bigquery.SchemaField("Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("bar", "INT64"),
        bigquery.SchemaField("id", "STRING")
    ],
    time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="Timestamp",  # field to use for partitioning
        expiration_ms=7776000000  # 90 days
    )
)
Run Code Online (Sandbox Code Playgroud)

我们可以使用LoadJob结果来验证一切正常:

print("Written {} rows to {}".format(result.output_rows, result.destination))
print("Partitioning: {}".format(result.time_partitioning))
Run Code Online (Sandbox Code Playgroud)
Written 4 rows to TableReference(DatasetReference(u'PROJECT_ID', u'test'), 'pandas_partitioned')
Partitioning: TimePartitioning(expirationMs=7776000000,field=Timestamp,type=DAY)
Run Code Online (Sandbox Code Playgroud)

并描述新创建的表:

$ bq show test.pandas_partitioned
Table PROJECT_ID:test.pandas_partitioned

   Last modified            Schema            Total Rows   Total Bytes   Expiration                  Time Partitioning                   Clustered Fields   Labels  
 ----------------- ------------------------- ------------ ------------- ------------ -------------------------------------------------- ------------------ -------- 
  21 Dec 10:01:42   |- Timestamp: timestamp   4            107                        DAY (field: Timestamp, expirationMs: 7776000000)                              
                    |- bar: integer                                                                                                                                 
                    |- foo: string                                                                                                                                  
                    |- id: string
Run Code Online (Sandbox Code Playgroud)

完整代码:

from datetime import datetime  
from datetime import timedelta 

import pandas as pd
from google.cloud import bigquery


PROJECT = "PROJECT_ID"
DATASET = "test"
TABLE = "pandas_partitioned"

bq_client = bigquery.Client(project=PROJECT)

job_config = bigquery.LoadJobConfig(
    schema = [
        bigquery.SchemaField("foo", "STRING"),
        bigquery.SchemaField("Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("bar", "INT64"),
        bigquery.SchemaField("id", "STRING")
    ],
    time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="Timestamp",  # name of the column to use for partitioning
        expiration_ms=7776000000  # 90 days
    )
)

data = {"foo": ["fighters", "manchu", "bar", "kien"],
       "Timestamp": [datetime.now() - timedelta(days=i) for i in range(4)],
       "bar": [100, 50, 75, 66],
       "id": ["1", "2", "3", "14"]}

df = pd.DataFrame(data)

load_job = bq_client.load_table_from_dataframe(
    df, '.'.join([PROJECT, DATASET, TABLE]), job_config = job_config
)

result = load_job.result()

print("Written {} rows to {}".format(result.output_rows, result.destination))
print("Partitioning: {}".format(result.time_partitioning))
Run Code Online (Sandbox Code Playgroud)