Sim*_*ton 1 python-3.x google-bigquery google-cloud-functions
我有点惊讶我在任何地方都找不到这个答案。
我有以下架构和设置:
job_config = bigquery.LoadJobConfig(schema = [
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("Timestamp", "TIMESTAMP"),
bigquery.SchemaField("bar", "INT64"),
bigquery.SchemaField("id", "STRING")
])
load_job = bq_client.load_table_from_dataframe(
df, '.'.join([PROJECT, DATASET, TABLE]), job_config = job_config
)
load_job.result()
Run Code Online (Sandbox Code Playgroud)
如您所见,我正在将数据框表 ( df
) 加载到 BigQuery。它运行良好。但是,我想将数据帧加载到分区表中,并使用时间戳字段来定义每个分区表的日期。
我该怎么做?
您可以修改time_partitioning
为您的LoadJobConfig
. 可以在此处TimePartitioning
找到该类的描述,并在文档中找到类似的示例。用于指定使用哪个字段作为分区标准。在您的情况下,它可能是这样的(添加 90 天过期规则):TimePartitioning.field
job_config = bigquery.LoadJobConfig(
schema = [
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("Timestamp", "TIMESTAMP"),
bigquery.SchemaField("bar", "INT64"),
bigquery.SchemaField("id", "STRING")
],
time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="Timestamp", # field to use for partitioning
expiration_ms=7776000000 # 90 days
)
)
Run Code Online (Sandbox Code Playgroud)
我们可以使用LoadJob
结果来验证一切正常:
print("Written {} rows to {}".format(result.output_rows, result.destination))
print("Partitioning: {}".format(result.time_partitioning))
Run Code Online (Sandbox Code Playgroud)
Written 4 rows to TableReference(DatasetReference(u'PROJECT_ID', u'test'), 'pandas_partitioned')
Partitioning: TimePartitioning(expirationMs=7776000000,field=Timestamp,type=DAY)
Run Code Online (Sandbox Code Playgroud)
并描述新创建的表:
$ bq show test.pandas_partitioned
Table PROJECT_ID:test.pandas_partitioned
Last modified Schema Total Rows Total Bytes Expiration Time Partitioning Clustered Fields Labels
----------------- ------------------------- ------------ ------------- ------------ -------------------------------------------------- ------------------ --------
21 Dec 10:01:42 |- Timestamp: timestamp 4 107 DAY (field: Timestamp, expirationMs: 7776000000)
|- bar: integer
|- foo: string
|- id: string
Run Code Online (Sandbox Code Playgroud)
完整代码:
from datetime import datetime
from datetime import timedelta
import pandas as pd
from google.cloud import bigquery
PROJECT = "PROJECT_ID"
DATASET = "test"
TABLE = "pandas_partitioned"
bq_client = bigquery.Client(project=PROJECT)
job_config = bigquery.LoadJobConfig(
schema = [
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("Timestamp", "TIMESTAMP"),
bigquery.SchemaField("bar", "INT64"),
bigquery.SchemaField("id", "STRING")
],
time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="Timestamp", # name of the column to use for partitioning
expiration_ms=7776000000 # 90 days
)
)
data = {"foo": ["fighters", "manchu", "bar", "kien"],
"Timestamp": [datetime.now() - timedelta(days=i) for i in range(4)],
"bar": [100, 50, 75, 66],
"id": ["1", "2", "3", "14"]}
df = pd.DataFrame(data)
load_job = bq_client.load_table_from_dataframe(
df, '.'.join([PROJECT, DATASET, TABLE]), job_config = job_config
)
result = load_job.result()
print("Written {} rows to {}".format(result.output_rows, result.destination))
print("Partitioning: {}".format(result.time_partitioning))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5187 次 |
最近记录: |