iva*_*arg 3 python google-cloud-dataflow
我有一个以JSON格式定义的非平凡的表模式(涉及嵌套和重复的字段)(具有名称,类型,模式属性)并存储在文件中.它已成功用于使用bq load命令填充bigquery表.
但是当我尝试使用Dataflow Python SDK和BigQuerySink做同样的事情时,schema参数需要是以逗号分隔的'name':'type'元素列表或bigquery.TableSchema对象.
是否有任何方便的方法来获取我的JSON模式bigquery.TableSchema,或者我是否必须将其转换为name:value列表?
发表安德烈Pierleoni上面的代码工作与旧版本的google-cloud-bigqueryPython客户端,例如用于版本0.25.0的google-cloud-bigquery出现这种情况通过安装pip install apache-beam[gcp]。
然而,BigQuery的Python客户端API已经在最近版本的巨大变化google-cloud-bigquery,例如在版本1.8.0是我目前,bigquery.TableFieldSchema()并bigquery.TableSchema()没有工作。
如果您使用的是更新版本的google-cloud-bigquery软件包,以下是SchemaField从 JSON 文件获取所需列表(例如,创建表所需)的方法。这是对 Andrea Pierleoni 上面发布的代码的改编(谢谢!)
def _get_field_schema(field):
name = field['name']
field_type = field.get('type', 'STRING')
mode = field.get('mode', 'NULLABLE')
fields = field.get('fields', [])
if fields:
subschema = []
for f in fields:
fields_res = _get_field_schema(f)
subschema.append(fields_res)
else:
subschema = []
field_schema = bigquery.SchemaField(name=name,
field_type=field_type,
mode=mode,
fields=subschema
)
return field_schema
def parse_bq_json_schema(schema_filename):
schema = []
with open(schema_filename, 'r') as infile:
jsonschema = json.load(infile)
for field in jsonschema:
schema.append(_get_field_schema(field))
return schema
Run Code Online (Sandbox Code Playgroud)
现在,假设您已经在 JSON 中定义了一个表的架构。假设您有这个特定的“schema.json”文件,然后使用上述辅助方法,您可以获得SchemaFieldPython 客户端所需的表示,如下所示:
>>> res_schema = parse_bq_json_schema("schema.json")
>>> print(res_schema)
[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]
Run Code Online (Sandbox Code Playgroud)
现在要使用 Python SDK 创建具有上述架构的表,您可以执行以下操作:
dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)
Run Code Online (Sandbox Code Playgroud)
您可以选择设置基于时间的分区(如果需要),如下所示:
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='timestamp' # name of column to use for partitioning
)
Run Code Online (Sandbox Code Playgroud)
这最终创建了表:
table = bqclient.create_table(table)
print('Created table {}, partitioned on column {}'.format(
table.table_id, table.time_partitioning.field))
Run Code Online (Sandbox Code Playgroud)
BigQuery 库中有一个内置的转换器函数:
from google.cloud import bigquery
...
client = bigquery.Client()
client.schema_from_json('path/to/schema.json`)
Run Code Online (Sandbox Code Playgroud)
目前,您无法直接指定JSON架构.您必须将模式指定为包含以逗号分隔的字段列表或bigquery.TableSchema对象的字符串.
如果架构很复杂并且包含嵌套和/或重复的字段,我们建议构建一个bigquery.TableSchema对象.
这是一个bigquery.TableSchema具有嵌套和重复字段的示例对象.
from apitools.clients import bigquery
table_schema = bigquery.TableSchema()
# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)
# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)
# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)
# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3060 次 |
| 最近记录: |