kri*_*nab 6 python protocol-buffers
我正在使用 Google Protocol Buffers 和 Python 来解码一些大数据文件——每个文件 200MB。我下面有一些代码展示了如何解码分隔流,它工作得很好。然而,它使用read()将整个文件加载到内存中然后迭代它的命令。
import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32
with open('/home/working/data/feed.pb', 'rb') as f:
buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
n = 0
while n < len(buf):
msg_len, new_pos = _DecodeVarint32(buf, n)
n = new_pos
msg_buf = buf[n:n+msg_len]
n += msg_len
read_row = sfeed.standard_feed()
read_row.ParseFromString(msg_buf)
# do something with read_metric
print(read_row)
Run Code Online (Sandbox Code Playgroud)
请注意,此代码来自另一篇 SO 帖子,但我不记得确切的网址。我想知道是否有readlines()协议缓冲区的等效项允许我一次读取一条分隔消息并对其进行解码?我基本上想要一个不受 RAM 限制的管道,我必须加载文件。
似乎有一个pystream-protobuf包支持其中一些功能,但它已经有一两年没有更新了。7年前的帖子也问过类似的问题。但我想知道从那以后是否有任何新信息。
如果可以一次加载一条完整消息,那么通过修改您发布的代码来实现这一点非常简单:
import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32
with open('/home/working/data/feed.pb', 'rb') as f:
buf = f.read(10) # Maximum length of length prefix
while buf:
msg_len, new_pos = _DecodeVarint32(buf, 0)
buf = buf[new_pos:]
# read rest of the message
buf += f.read(msg_len - len(buf))
read_row = sfeed.standard_feed()
read_row.ParseFromString(buf)
buf = buf[msg_len:]
# do something with read_metric
print(read_row)
# read length prefix for next message
buf += f.read(10 - len(buf))
Run Code Online (Sandbox Code Playgroud)
这会读取 10 个字节,足以解析长度前缀,然后在已知长度后读取消息的其余部分。
字符串突变在 Python 中效率不高(它们会生成大量数据副本),因此bytearray如果您的单个消息也很大,则使用字符串突变可以提高性能。
| 归档时间: |
|
| 查看次数: |
1617 次 |
| 最近记录: |