如何使用 Python 将 Google Pub/Sub 与 Google Dataflow/Beam 结合使用?

Min*_*ato 0 python google-cloud-pubsub google-cloud-dataflow

我是 Pub/Sub 和 Dataflow/Beam 的新手。我已经在 Spark 和 Kafka 中完成了一项任务,我想使用 Pub/Sub 和 Dataflow/Beam 做同样的事情。据我目前的理解,Kafka 类似于 Pub/Sub,Spark 类似于 Dataflow/Beam。

任务是获取 JSON 文件并写入 Pub/Sub 主题。然后使用 Beam/Dataflow 我需要将该数据放入 PCollection。我将如何实现这一目标?

Min*_*ato 7

我解决了上述问题。我能够从 pubsub 主题中连续读取数据,然后进行一些处理,然后将结果写入数据存储。

with beam.Pipeline(options=options) as p:

    # Read from PubSub into a PCollection.
    lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)

    # Group and aggregate each JSON object.
    transformed = (lines
                   | 'Split' >> beam.FlatMap(lambda x: x.split("\n"))
                   | 'jsonParse' >> beam.ParDo(jsonParse())
                   | beam.WindowInto(window.FixedWindows(15,0))
                   | 'Combine' >> beam.CombinePerKey(sum))

    # Create Entity.
    transformed = transformed | 'create entity' >> beam.Map(
      EntityWrapper(config.NAMESPACE, config.KIND, config.ANCESTOR).make_entity)

    # Write to Datastore.
    transformed | 'write to datastore' >> WriteToDatastore(known_args.dataset_id)
Run Code Online (Sandbox Code Playgroud)