我目前是在 Python 中使用 Apache Beam 和 Dataflow runner 的新手。我有兴趣创建一个发布到 Google Cloud PubSub 的批处理管道,我对 Beam Python API 进行了修改并找到了一个解决方案。然而,在我的探索过程中,我遇到了一些有趣的问题,让我很好奇。
目前,我成功地从 GCS 批量发布数据的光束管道如下所示:
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> …Run Code Online (Sandbox Code Playgroud)