sha*_*red 1 python google-cloud-dataflow apache-beam
我正在通过ReadFromPubSubwith阅读消息timestamp_attribute=None,它应该将时间戳设置为发布时间。
这样,我最终得到 a PCollectionofPubsubMessage元素。
如何按顺序访问这些元素的时间戳,例如将它们保存到数据库?我能看到的唯一属性是dataand attributes,并且attributes只有来自 Pub/Sub 的键。
编辑:示例代码
with beam.Pipeline(options=pipeline_options) as p:
items = (p
| ReadFromPubSub(topic=args.read_topic, with_attributes=True)
| beam.WindowInto(beam.window.FixedWindows(args.time_window))
| 'FormatMessage' >> beam.Map(format_message)
| 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
args.project, write_disposition='WRITE_APPEND')
)
Run Code Online (Sandbox Code Playgroud)
whereformat_message将使用 aPubsubMessage并返回一个字典,表示要附加到表中的行:
def format_message(message):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes)
}
return formatted_message
Run Code Online (Sandbox Code Playgroud)
结果证明可以修改映射函数以读取其他参数:
def format_message(message, timestamp=beam.DoFn.TimestampParam):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes),
'timestamp': float(timestamp)
}
return formatted_message
Run Code Online (Sandbox Code Playgroud)
更多可能的参数:https : //beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
| 归档时间: |
|
| 查看次数: |
1261 次 |
| 最近记录: |