反序列化消息而不将整个文件加载到内存中?

kri*_*nab 6 python protocol-buffers

我正在使用 Google Protocol Buffers 和 Python 来解码一些大数据文件——每个文件 200MB。我下面有一些代码展示了如何解码分隔流,它工作得很好。然而,它使用read()将整个文件加载到内存中然后迭代它的命令。

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
    n = 0
    while n < len(buf):
        msg_len, new_pos = _DecodeVarint32(buf, n)
        n = new_pos
        msg_buf = buf[n:n+msg_len]
        n += msg_len
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(msg_buf)
        # do something with read_metric
        print(read_row)
Run Code Online (Sandbox Code Playgroud)

请注意,此代码来自另一篇 SO 帖子,但我不记得确切的网址。我想知道是否有readlines()协议缓冲区的等效项允许我一次读取一条分隔消息并对其进行解码?我基本上想要一个不受 RAM 限制的管道,我必须加载文件。

似乎有一个pystream-protobuf包支持其中一些功能,但它已经有一两年没有更新了。7年前的帖子也问过类似的问题。但我想知道从那以后是否有任何新信息。

从流中读取多个 protobuf 消息的 python 示例

jpa*_*jpa 5

如果可以一次加载一条完整消息,那么通过修改您发布的代码来实现这一点非常简单:

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read(10) # Maximum length of length prefix
    while buf:
        msg_len, new_pos = _DecodeVarint32(buf, 0)
        buf = buf[new_pos:]
        # read rest of the message
        buf += f.read(msg_len - len(buf))
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(buf)
        buf = buf[msg_len:]
        # do something with read_metric
        print(read_row)
        # read length prefix for next message
        buf += f.read(10 - len(buf))
Run Code Online (Sandbox Code Playgroud)

这会读取 10 个字节,足以解析长度前缀,然后在已知长度后读取消息的其余部分。

字符串突变在 Python 中效率不高(它们会生成大量数据副本),因此bytearray如果您的单个消息也很大,则使用字符串突变可以提高性能。