use*_*407 15 python stream protocol-buffers
我正在处理来自spinn3r的数据,它包含序列化为字节流的多个不同的protobuf消息:
http://code.google.com/p/spinn3r-client/wiki/Protostream
"protostream是一个协议缓冲消息流,根据Google协议缓冲区规范在线上编码为长度前缀变量.该流有三个部分:头部,有效负载和尾部标记."
这似乎是protobufs的一个非常标准的用例.实际上,protobuf核心发行版为C++和Java提供了CodedInputStream.但是,似乎protobuf没有为python提供这样的工具 - '内部'工具没有为这种外部使用设置:
https://groups.google.com/forum/?fromgroups#!topic/protobuf/xgmUqXVsK-o
所以...在我去拼凑一个python varint解析器和工具来解析不同消息类型的流之前:有没有人知道这个的任何工具?
为什么protobuf缺少它?(或者我只是没找到它?)
对于protobuf而言,这似乎是一个很大的差距,特别是与thrift的"运输"和"协议"的等效工具相比.我正确地查看了吗?
Dra*_*gos 12
看起来其他答案中的代码可能会从此处解除.在使用此文件之前检查许可证,但我设法varint32使用以下代码读取s:
import sys
import myprotocol_pb2 as proto
import varint # (this is the varint.py file)
data = open("filename.bin", "rb").read() # read file as string
decoder = varint.decodeVarint32 # get a varint32 decoder
# others are available in varint.py
next_pos, pos = 0, 0
while pos < len(data):
msg = proto.Msg() # your message type
next_pos, pos = decoder(data, pos)
msg.ParseFromString(data[pos:pos + next_pos])
# use parsed message
pos += next_pos
print "done!"
Run Code Online (Sandbox Code Playgroud)
这是一个非常简单的代码,用于加载由varint32s 分隔的单个类型的消息,这些消息描述了下一个消息的大小.
更新:也可以使用以下命令直接从protobuf库中包含此文件:
from google.protobuf.internal.decoder import _DecodeVarint32
Run Code Online (Sandbox Code Playgroud)
我实现了一个小型 python 包,用于将多个 protobuf 消息序列化到一个流中,并将它们从流中反序列化。您可以通过以下方式安装它pip:
pip install pystream-protobuf
Run Code Online (Sandbox Code Playgroud)
以下是将两个 protobuf 消息列表写入文件的示例代码:
pip install pystream-protobuf
Run Code Online (Sandbox Code Playgroud)
然后vg_pb2.py从流中读取相同的消息(例如定义的对齐消息):
import stream
with stream.open("test.gam", "wb") as ostream:
ostream.write(*objects_list)
ostream.write(*another_objects_list)
Run Code Online (Sandbox Code Playgroud)
use*_*407 -3
这很简单,我可以理解为什么可能没有人费心去制作一个可重用的工具:
'''
Parses multiple protobuf messages from a stream of spinn3r data
'''
import sys
sys.path.append('python_proto/src')
import spinn3rApi_pb2
import protoStream_pb2
data = open('8mny44bs6tYqfnofg0ELPg.protostream').read()
def _VarintDecoder(mask):
'''Like _VarintDecoder() but decodes signed values.'''
local_ord = ord
def DecodeVarint(buffer, pos):
result = 0
shift = 0
while 1:
b = local_ord(buffer[pos])
result |= ((b & 0x7f) << shift)
pos += 1
if not (b & 0x80):
if result > 0x7fffffffffffffff:
result -= (1 << 64)
result |= ~mask
else:
result &= mask
return (result, pos)
shift += 7
if shift >= 64:
## need to create (and also catch) this exception class...
raise _DecodeError('Too many bytes when decoding varint.')
return DecodeVarint
## get a 64bit varint decoder
decoder = _VarintDecoder((1<<64) - 1)
## get the three types of protobuf messages we expect to see
header = protoStream_pb2.ProtoStreamHeader()
delimiter = protoStream_pb2.ProtoStreamDelimiter()
entry = spinn3rApi_pb2.Entry()
## get the header
pos = 0
next_pos, pos = decoder(data, pos)
header.ParseFromString(data[pos:pos + next_pos])
## should check its contents
while 1:
pos += next_pos
next_pos, pos = decoder(data, pos)
delimiter.ParseFromString(data[pos:pos + next_pos])
if delimiter.delimiter_type == delimiter.END:
break
pos += next_pos
next_pos, pos = decoder(data, pos)
entry.ParseFromString(data[pos:pos + next_pos])
print entry
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
21829 次 |
| 最近记录: |