glu*_*lux 3 python dataflow google-bigquery google-cloud-dataflow apache-beam
我正在尝试使用数据流读取 pubsub 消息并将其写入大查询。我获得了 Google 团队的 alpha 访问权限,并且已经使提供的示例正常工作,但现在我需要将其应用到我的场景中。
发布订阅负载:
Message {
data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
attributes: {}
}
Run Code Online (Sandbox Code Playgroud)
大查询架构:
schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',
Run Code Online (Sandbox Code Playgroud)
我的目标是简单地读取消息有效负载并插入到 bigquery 中。我正在努力了解转换以及如何将键/值映射到大查询模式。
我对此很陌生,因此非常感谢任何帮助。
当前代码: https: //codeshare.io/ayqX8w
谢谢!
我能够通过定义一个将 pubsub 字符串加载到 json 对象的函数来成功解析它(请参阅 parse_pubsub())。我遇到的一个奇怪的问题是我无法在全局范围内导入 json。我收到“NameError:全局名称'json'未定义”错误。我必须在函数中导入 json。
请参阅下面的我的工作代码:
from __future__ import absolute_import
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
'''Normalize pubsub string to json object'''
# Lines look like this:
# {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
def parse_pubsub(line):
import json
record = json.loads(line)
return (record['mac']), (record['status']), (record['datetime'])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
parser.add_argument(
'--output_table', required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read the pubsub topic into a PCollection.
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
| beam.Map(parse_pubsub)
| beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
| beam.io.WriteToBigQuery(
known_args.output_table,
schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4801 次 |
| 最近记录: |