为什么自定义 Python 对象不能与 ParDo Fn 一起使用?

dek*_*iya 5 python google-cloud-dataflow apache-beam

我目前是在 Python 中使用 Apache Beam 和 Dataflow runner 的新手。我有兴趣创建一个发布到 Google Cloud PubSub 的批处理管道,我对 Beam Python API 进行了修改并找到了一个解决方案。然而,在我的探索过程中,我遇到了一些有趣的问题,让我很好奇。

1. 成功的管道

目前,我成功地从 GCS 批量发布数据的光束管道如下所示:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )
Run Code Online (Sandbox Code Playgroud)

2. 不成功的管道

在这里,我试图让发布者在DoFn. 我尝试了以下方法。

一种。在 DoFn 中初始化发布者

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1
Run Code Online (Sandbox Code Playgroud)

湾 在 DoFn 之外初始化 Publisher,并将其传递给 DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )
Run Code Online (Sandbox Code Playgroud)

使发布者跨DoFn方法共享的两次尝试都失败了,并显示以下错误消息:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
Run Code Online (Sandbox Code Playgroud)

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
Run Code Online (Sandbox Code Playgroud)

我的问题是:

  1. 共享发布者的实现会提高光束管道的性能吗?如果是,那么我想探索这个解决方案。

  2. 为什么错误会出现在我失败的管道上?是不是因为在函数外初始化和传递自定义类对象给DoFn process?如果是由于这个原因,我如何实现一个管道,以便我能够在 DoFn 中重用自定义对象?

谢谢,您的帮助将不胜感激。

编辑:解决方案

好的,Ankur 已经解释了我的问题出现的原因,并讨论了如何在 DoFn 上完成序列化。基于这些知识,我现在了解到有两种解决方案可以使自定义对象在 DoFn 中共享/可重用:

  1. 使自定义对象可序列化:这允许对象在 DoFn 对象创建期间初始化/可用(在 下__init__)。此对象必须是可序列化的,因为它将在创建 DoFn 对象(调用__init__)的管道提交期间被序列化。如何实现这一点在我的回答中回答如下。此外,我发现这个要求实际上与 [1][2] 下的 Beam 文档相关。

  2. 在外部的 DoFn 函数中初始化不可序列化对象__init__以避免序列化,因为在管道提交期间不会调用init之外的函数。Ankur 的回答中解释了如何完成此操作。

参考:

[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

小智 5

PublisherClient无法正确腌制。有关酸洗的更多信息请参见此处PublisherClient在方法中初始化process可以避免对 进行酸洗PublisherClient

如果目的是重用PublisherClient,我建议PublisherClient在 process 方法中初始化并self使用以下代码将其存储。

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        if not hasattr(self, 'publish'):
            from google.cloud import pubsub_v1
            self.publisher = pubsub_v1.PublisherClient()
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()
Run Code Online (Sandbox Code Playgroud)