相关疑难解决方法(0)

为什么自定义 Python 对象不能与 ParDo Fn 一起使用?

我目前是在 Python 中使用 Apache Beam 和 Dataflow runner 的新手。我有兴趣创建一个发布到 Google Cloud PubSub 的批处理管道,我对 Beam Python API 进行了修改并找到了一个解决方案。然而,在我的探索过程中,我遇到了一些有趣的问题,让我很好奇。

1. 成功的管道

目前,我成功地从 GCS 批量发布数据的光束管道如下所示:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
1501
查看次数