如何在python梁中制作通用的Protobuf Parser DoFn?

dek*_*iya 7 python protocol-buffers google-cloud-platform google-cloud-dataflow apache-beam

上下文
我正在使用一个流管道,它在 pubsub 中有一个 protobuf 数据源。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。通过在processDoFn 函数中初始化 protobuf 消息,我成功地开发了 Protobuf Parser 。

为什么需要通用 Protobuf 解析器

但是,我想知道,是否可以在 Beam 上制作一个通用的 ProtobufParser DoFn?从工程角度来看,通用 DoFn 非常有用,可以避免重新实现现有功能并实现代码重用。在 Java 中,我知道我们能够使用泛型,因此在 Java 中实现这个泛型 ProtobufParser 相对容易。由于 Python 函数是一流的对象,我在想是否可以将 Protobuf 模式类(而不是消息实例对象)传递到 DoFn 中。我试图这样做,但我一直失败。

带有警告的成功解析器:不可推广

下面是我目前成功的 protobuf 解析器。protobuf 消息在函数内部初始化process

class ParsePubSubProtoToDict(beam.DoFn):

    def process(self, element, *args, **kwargs):
        from datapipes.protos.data_pb2 import DataSchema
        from google.protobuf.json_format import MessageToDict

        message = DataSchema()
        message.ParseFromString(element)

        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj
Run Code Online (Sandbox Code Playgroud)

虽然上面的 Protobuf DoFn 解析器工作很好,但它并没有推广到所有的 protobuf 模式,因此这将导致需要为不同的 protobuf 模式重新实现一个新的 DoFn 解析器。

我的尝试

为了使解析器适用于所有 protobuf 模式,我尝试将 protobuf 模式(在 Python 中作为类生成)传递给 DoFn。

class ParsePubSubProtoToDict(beam.DoFn):
    def __init__(self, proto_class):
        self.proto_class = proto_class

    def process(self, element, *args, **kwargs):
        from google.protobuf.json_format import MessageToDict

        message = self.proto_class()
        message.ParseFromString(element)
        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj


def run_pubsub_to_gbq_pipeline(argv):
    ...
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))
Run Code Online (Sandbox Code Playgroud)

和其他类似的技术,但是,我所有的尝试都失败了,并显示相同的错误消息: pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema

从这个错误信息中,我对问题发生的原因有两个假设:

  1. Protobuf 模式类是不可序列化的。然而,这个假设可能是错误的,因为虽然我知道pickle不能序列化 protobuf 模式,但如果我使用dill,我能够序列化 protobuf 模式。但除此之外,我仍然有点不确定python beam中的DoFn如何实现序列化(例如:当它使用dillpickle序列化事物时,对象的序列化格式是什么以使其可序列化并与DoFn兼容等)

  2. DoFn 类中的导入错误。由于函数/类作用域和数据流工作器,我遇到了几个 python beam 导入错误问题,为了解决这个问题,我不得不在需要它的函数中本地导入包,而不是在模块中全局导入。那么也许,如果我们将 protobuf 模式类传递给 DoFn,模式导入实际上是在 DoFn 之外完成的,因此 DoFn 无法正确解析 DoFn 内部的类名?


我的问题是:

  1. 为什么会出现此错误,我该如何解决此错误?
  2. 是否可以传递 protobuf 模式类?或者是否有更好的方法来实现通用 protobuf 到 python dict 解析器 DoFn 而不将 protobuf 模式类传递给 DoFn?
  3. Python 中的 DoFn 是如何工作的,我如何确保传递给 DoFn 的创建(__init__)的对象是可序列化的?梁上是否有一个 Serializable 类,我可以在其中继承,以便将不可序列化的对象转换为可序列化的对象?

非常感谢!您的帮助将不胜感激。

dek*_*iya 5

我实际上找到了一个替代解决方案来创建一个通用的 Protobuf 解析器 beam.Map

def convert_proto_to_dict(data, schema_class):
    message = schema_class()

    if isinstance(data, (str, bytes)):
        message.ParseFromString(data)
    else:
        message = data

    return MessageToDict(message, preserving_proto_field_name=True)


def run_pubsub_to_gbq_pipeline(argv):
    ... options initialization
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))
Run Code Online (Sandbox Code Playgroud)

所以首先,我创建了一个函数,它接收一个 protobuf 模式类和 protobuf 数据(当前在字节字符串中)作为参数。该函数将字符串字节数据初始化并解析为 protobuf 消息,并将 protobuf 消息转换为 python 字典。

然后这个函数被 使用beam.Map,所以现在我能够在没有beam.DoFn. 但是,我仍然很好奇为什么 protobuf 模式类在与 DoFn 一起使用时会出现问题,所以如果您知道为什么以及如何解决这个问题,请在此处分享您的答案,谢谢!