protobuf 解组未知消息

guc*_*cki 6 go protocol-buffers

我有一个接收 protobuf 消息的监听器。然而,它不知道什么类型的消息何时到来。所以我尝试将其解组为一个interface{},以便稍后可以键入:

var data interface{}
err := proto.Unmarshal(message, data)
if err != nil {
  log.Fatal("unmarshaling error: ", err)
}
log.Printf("%v\n", data)
Run Code Online (Sandbox Code Playgroud)

但是这段代码无法编译:

cannot use data (type interface {}) as type proto.Message in argument to proto.Unmarshal:
  interface {} does not implement proto.Message (missing ProtoMessage method)
Run Code Online (Sandbox Code Playgroud)

如何在 go 中解组并随后类型转换“未知”protobuf 消息?

bla*_*een 10

首先,关于OP提出的问题,说两句话:

proto.Unmarshal无法解组为interface{}. 方法签名很明显,您必须传递一个proto.Message参数,该参数是由具体的 protobuffer 类型实现的接口。

[]byte当处理未出现在 an 中的原始 protobuffer 有效负载时Any,理想情况下,您至少有一些东西(字符串、数字等...)与字节切片结合在一起,您可以使用它来映射到具体的 protobuf 消息。

然后,您可以打开它并实例化适当的 protobuf 具体类型,然后才将该参数传递给Unmarshal

var message proto.Message
switch atLeastSomething {
    case "foo":
        message = &mypb.Foo{}
    case "bar":
        message = &mypb.Bar{}
}
_ = proto.Unmarshal(data, message)
Run Code Online (Sandbox Code Playgroud)

现在,如果字节有效负载确实未知怎么办?

作为前言,请考虑这种情况在实践中很少发生。用于以您选择的语言生成 protobuffer 类型的模式代表了一个契约,并且通过接受 protobuffer 有效负载,对于它的某些定义,您就履行了该契约。

无论如何,如果由于某种原因你必须处理一个完全未知的、神秘的、有线格式的 protobuffer 有效负载,你可以通过包从中提取一些protowire信息。

请注意,protobuf 消息的线路表示是不明确的。2不确定性的一个重要来源是用于字符串、字节、重复字段和...子消息(参考)的“长度分隔”类型 ( ) 。

您可以检索有效负载内容,但必然具有弱语义。

代码

话虽如此,这就是未知原始消息的解析器的样子。这个想法是利用protowire.ConsumeField读取原始字节片的能力。

数据模型可能是这样的:

type Field struct {
    Tag Tag
    Val Val
}

type Tag struct {
    Num int32
    Type protowire.Type
}

type Val struct {
    Payload interface{}
    Length int
}
Run Code Online (Sandbox Code Playgroud)

和解析器:

func parseUnknown(b []byte) []Field {
    fields := make([]Field, 0)
    for len(b) > 0 {
        n, t, fieldlen := protowire.ConsumeField(b)
        if fieldlen < 1 {
            return nil
        }
        field := Field{
            Tag: Tag{Num: int32(n), Type: t },
        }

        _, _, taglen := protowire.ConsumeTag(b[:fieldlen])
        if taglen < 1 {
            return nil
        }

        var (
            v interface{}
            vlen int
        )
        switch t {
        case protowire.VarintType:
            v, vlen = protowire.ConsumeVarint(b[taglen:fieldlen])

        case protowire.Fixed64Type:
            v, vlen = protowire.ConsumeFixed64(b[taglen:fieldlen])

        case protowire.BytesType:
            v, vlen = protowire.ConsumeBytes(b[taglen:fieldlen])
            sub := parseUnknown(v.([]byte))
            if sub != nil {
                v = sub
            }

        case protowire.StartGroupType:
            v, vlen = protowire.ConsumeGroup(n, b[taglen:fieldlen])
            sub := parseUnknown(v.([]byte))
            if sub != nil {
                v = sub
            }

        case protowire.Fixed32Type:
            v, vlen = protowire.ConsumeFixed32(b[taglen:fieldlen])
        }

        if vlen < 1 {
            return nil
        }

        field.Val = Val{Payload: v, Length: vlen - taglen}
        // fmt.Printf("%#v\n", field)

        fields = append(fields, field)
        b = b[fieldlen:]
    }
    return fields
}
Run Code Online (Sandbox Code Playgroud)

样本输入和输出

给定一个原型模式,例如:

message Foo {
  string a = 1;
  string b = 2;
  Bar bar = 3;
}

message Bar {
  string c = 1;
}
Run Code Online (Sandbox Code Playgroud)

在 Go 中初始化为:

&test.Foo{A: "A", B: "B", Bar: &test.Bar{C: "C"}}
Run Code Online (Sandbox Code Playgroud)

通过fmt.Printf("%#v\n", field)在上面的代码中的循环末尾添加一条语句,它将输出以下内容:

main.Field{Tag:main.Tag{Num:1, Type:2}, Val:main.Val{Payload:[]uint8{0x41}, Length:1}}
main.Field{Tag:main.Tag{Num:2, Type:2}, Val:main.Val{Payload:[]uint8{0x42}, Length:1}}
main.Field{Tag:main.Tag{Num:1, Type:2}, Val:main.Val{Payload:[]uint8{0x43}, Length:1}}
main.Field{Tag:main.Tag{Num:3, Type:2}, Val:main.Val{Payload:[]main.Field{main.Field{Tag:main.Tag{Num:1, Type:2}, Val:main.Val{Payload:[]uint8{0x43}, Length:1}}}, Length:3}}
Run Code Online (Sandbox Code Playgroud)

关于子消息

正如您从上面看到的,处理protowire.BytesType可能是也可能不是消息字段的想法是尝试递归地解析它。如果成功,我们保留结果msg并将其存储在字段值中,如果失败,我们按原样存储字节,这可能是一个 protostringbytes. 顺便说一句,如果我没看错的话,这似乎是 Marc Gravell 在Protogen 代码中所做的事情。

关于重复字段

上面的代码没有明确处理重复字段,但是解析完成后,重复字段将具有相同的值Field.Tag.Num。由此看来,将字段打包到切片/数组中应该很简单。

关于地图

上面的代码也不处理原始地图。我怀疑映射在语义上等同于重复的 k/v 对,例如:

message Pair {
    string key = 1; // or whatever key type
    string val = 2; // or whatever val type
}
Run Code Online (Sandbox Code Playgroud)

如果我的假设是正确的,那么可以使用给定的代码将地图解析为子消息。

关于oneof我们

我还没有对此进行测试,但我希望有关联合类型的信息完全丢失。字节有效负载将仅包含实际设置的值。

但是关于Any

原型Any与图片不符。与它看起来的相反,Any 它并不类似于map[string]interface{}JSON 对象。原因很简单:Any是一个具有非常明确结构的原型消息,即(在 Go 中):

type Any struct {
    // unexported fields
    TypeUrl string // struct tags omitted
    Value []byte   // struct tags omitted
}
Run Code Online (Sandbox Code Playgroud)

因此它更类似于 Go 的实现,interface{}因为它保存一些实际数据和该数据的类型信息。

它可以保存任意原始有效负载(及其类型信息!),但它不能用于解码未知消息,因为它恰好Any具有这两个字段,类型 url 和字节有效负载。


总之,这个答案没有提供成熟的生产级解决方案,但它展示了如何解码任意有效负载,同时保留尽可能多的原始语义。希望它能为您指明正确的方向。


Daz*_*kin -1

正如您所看到的,以及评论者指出的,您不能使用proto.Unmarshalto ,因为该方法需要一个实现接口的interface{}类型。MessageMessageV1

Protobuf 消息是类型化的并对应于方法调用(“进来”),并且实现不能采用通用类型的 protobuf,而是采用特定的 protobuf:

func (s *server) M(ctx context.Context, _ *pb.Foo) (*pb.Bar, error)
Run Code Online (Sandbox Code Playgroud)

解决方案是将泛型类型封装在Any特定类型中Envelope

message Envelope {
  google.protobuf.Any content = 1;
  ...
}
Run Code Online (Sandbox Code Playgroud)

然后将其content作为 a 进行传输[]byte(请参阅 Golang anypb.Any),并且实现 ( anypb) 包括打包|解包它们的方法。

“技巧”Any是消息包含一个TypeURL唯一标识消息的 [ ],以便接收者知道如何接收Unmarshal它。