在 Go 服务器的接收端到达不一致的 TCP Protobuf 消息

Vik*_*vin 0 buffer tcp go protocol-buffers

我有一个“代理”,它将二进制文件解析到一个缓冲区中,每当该缓冲区被填满时,就通过 protobuf 消息将其发送到服务器,然后继续进行下一个二进制解析块,然后再次发送,等等。

在服务器上,我使用一个简单的net/conn包来侦听代理连接并在 while-for 循环中从它读取到缓冲区。当解析完成代理端,它terminate在 protobuf 消息中发送一个布尔值,表明这是最后一条消息,服务器可以继续处理收到的完整数据。

但是,如果我将调试打印留在我的发送方,这会很好地工作,使终端打印显着减慢通过connection.Write().

如果我取消注释这个记录器,那么它发送消息的速度太快,服务器处理的第一个传入消息是一个包含terminate标志的数据包,例如,它没有收到实际的有效负载,而是立即收到最后一条消息。

我知道 TCP 并没有真正区分不同的 []byte 数据包,这很可能是导致这种行为的原因。有没有更好的方法来做到这一点,任何替代方案?

伪代码代理端:

    buffer := make([]byte, 1024)
    for {
        n, ioErr := reader.Read(buffer)
        if ioErr == io.EOF {
            isPayloadFinal = true

            // Create protobuf message
            terminalMessage, err := CreateMessage_FilePackage(
                2234,
                protobuf.MessageType_PACKAGE,
                make([]byte, 1),
                isPayloadFinal,
            )
            // Send terminate message
            sendProtoBufMessage(connection, terminalMessage)
            break
        }
        // Create regular protobuf message
        message, err := CreateMessage_FilePackage(
            2234,
            protobuf.MessageType_PACKAGE,
            (buffer)[:n],
            isPayloadFinal)
        sendProtoBufMessage(connection, message)
   }
Run Code Online (Sandbox Code Playgroud)

伪代码服务器端:

    buffer := make([]byte, 2048)
    //var protoMessage protoBufMessage

    for artifactReceived != true {
        connection.SetReadDeadline(time.Now().Add(timeoutDuration))
        n, _ := connection.Read(buffer)
        decodedMessage := &protobuf.FileMessage{}
        if err := proto.Unmarshal(buffer[:n], decodedMessage); err != nil {
            log.Err(err).Msg("Error during unmarshalling")
        }

        if isPackageFinal := decodedMessage.GetIsTerminated(); isPackageFinal == true {
            artifactReceived = true
            log.Info().Msg("Artifact fully received")
            /* Do stuff here */
            break
        }
        // Handle partially arrived bytestream
        handleProtoPackage(packageMessage, artifactPath)
        } else {
            fmt.Println("INVALID PROTOBUF MESSAGE")
        }
    }
Run Code Online (Sandbox Code Playgroud)

和 proto 文件供参考:

message FilePackage{
    int32 id = 1;
    MessageType msgType = 2;
    bytes payload = 3;
    bool isTerminated = 4;

}
Run Code Online (Sandbox Code Playgroud)

Bri*_*its 5

正如您所说,最可能的原因似乎是“TCP 并没有真正区分不同的 [] 字节数据包”(TCP 流没有消息边界)。当您调用connection.Read(buffer)(我假设这connection是一个net.Conn)时,它将阻塞,直到某些数据可用(或达到读取截止日期),然后返回该数据(直至缓冲区大小)。返回的数据可能是一条消息(如您​​在测试中所见),但也可能是部分消息,或者多条消息(取决于时间和网络堆栈;您不应做出任何假设)。

protobuf的文档提供建议的技术:

如果您想将多条消息写入单个文件或流,则由您来跟踪一条消息的结束位置和下一条消息的开始位置。Protocol Buffer 有线格式不是自定界的,因此 Protocol Buffer 解析器无法自行确定消息的结束位置。解决这个问题最简单的方法是在写消息本身之前写下每条消息的大小。当您读回消息时,您读取大小,然后将字节读入单独的缓冲区,然后从该缓冲区解析。

如果您采用这种方法,那么您可以io.ReadFull在接收数据时使用(因为您将知道该大小需要多少字节,然后使用它来接收数据包)。