tza*_*man 64 c++ java serialization protocol-buffers
我正在尝试从C++和Java中的文件读取/写入多个Protocol Buffers消息.谷歌建议在消息之前写长度前缀,但默认情况下没办法(我可以看到).
但是,2.1.0版中的Java API收到了一组"Delimited"I/O函数,显然可以完成这项工作:
parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo
Run Code Online (Sandbox Code Playgroud)
有C++等价物吗?如果没有,那么Java API附加的大小前缀是什么,所以我可以用C++解析这些消息?
这些现在存在于google/protobuf/util/delimited_message_util.h
v3.3.0中.
Ken*_*rda 75
我在这里参加派对有点晚了,但是下面的实现包括其他答案中缺少的一些优化,并且在64MB输入后不会失败(尽管它仍然对每条消息强制执行64MB限制,而不是在整个流上).
(我是C++和Java protobuf库的作者,但我不再为Google工作了.很抱歉,这段代码从未进入官方库.这就是它的样子.)
bool writeDelimitedTo(
const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
// We create a new coded stream for each message. Don't worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL) {
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
} else {
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError()) return false;
}
return true;
}
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) {
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
Run Code Online (Sandbox Code Playgroud)
tza*_*man 17
好的,所以我无法找到实现我需要的顶级C++函数,但是在MessageLite接口中,有些人通过Java API参考了解了以下内容:
void writeDelimitedTo(OutputStream output)
/* Like writeTo(OutputStream), but writes the size of
the message as a varint before writing the data. */
Run Code Online (Sandbox Code Playgroud)
所以Java大小前缀是(Protocol Buffers)varint!
有了这些信息,我开始挖掘C++ API并找到了CodedStream标头,其中包含:
bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)
Run Code Online (Sandbox Code Playgroud)
使用这些,我应该能够使用我自己的C++函数来完成这项工作.
他们应该真的将它添加到主Message API中; 考虑到Java,它缺少功能,Marc Gravell的优秀protobuf-net C#端口(通过SerializeWithLengthPrefix和DeserializeWithLengthPrefix)也是如此.
Yuk*_*iko 12
我使用CodedOutputStream/ArrayOutputStream解决了同样的问题,用于编写消息(带有大小)和CodedInputStream/ArrayInputStream来读取消息(带有大小).
例如,以下伪代码写入消息后面的消息大小:
const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;
google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);
codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);
Run Code Online (Sandbox Code Playgroud)
写作时,还应检查您的缓冲区是否足够大以适应消息(包括大小).在阅读时,您应该检查您的缓冲区是否包含整个消息(包括大小).
如果他们为C++ API添加类似于Java API提供的便捷方法,那肯定会很方便.
干得好:
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
using namespace google::protobuf::io;
class FASWriter
{
std::ofstream mFs;
OstreamOutputStream *_OstreamOutputStream;
CodedOutputStream *_CodedOutputStream;
public:
FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
{
assert(mFs.good());
_OstreamOutputStream = new OstreamOutputStream(&mFs);
_CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
}
inline void operator()(const ::google::protobuf::Message &msg)
{
_CodedOutputStream->WriteVarint32(msg.ByteSize());
if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
std::cout << "SerializeToCodedStream error " << std::endl;
}
~FASWriter()
{
delete _CodedOutputStream;
delete _OstreamOutputStream;
mFs.close();
}
};
class FASReader
{
std::ifstream mFs;
IstreamInputStream *_IstreamInputStream;
CodedInputStream *_CodedInputStream;
public:
FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
{
assert(mFs.good());
_IstreamInputStream = new IstreamInputStream(&mFs);
_CodedInputStream = new CodedInputStream(_IstreamInputStream);
}
template<class T>
bool ReadNext()
{
T msg;
unsigned __int32 size;
bool ret;
if ( ret = _CodedInputStream->ReadVarint32(&size) )
{
CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
{
_CodedInputStream->PopLimit(msgLimit);
std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
}
}
return ret;
}
~FASReader()
{
delete _CodedInputStream;
delete _IstreamInputStream;
mFs.close();
}
};
Run Code Online (Sandbox Code Playgroud)
IsteamInputStream对于与std :: istream一起使用时容易发生的eofs和其他错误非常脆弱.在此之后,protobuf流被永久性损坏,并且任何已经使用的缓冲数据被破坏.对protobuf中的传统流进行读取有适当的支持.
google::protobuf::io::CopyingInputStream
与CopyingInputStreamAdapter一起实现和使用它.对输出变体执行相同的操作.
实际上,解析调用最终会在google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)
给定缓冲区的位置.剩下要做的就是以某种方式阅读它.
以下是与Asio同步流(SyncReadStream/SyncWriteStream)一起使用的示例:
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
public:
AsioInputStream(SyncReadStream& sock);
int Read(void* buffer, int size);
private:
SyncReadStream& m_Socket;
};
template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
m_Socket(sock) {}
template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
std::size_t bytes_read;
boost::system::error_code ec;
bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);
if(!ec) {
return bytes_read;
} else if (ec == boost::asio::error::eof) {
return 0;
} else {
return -1;
}
}
template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
public:
AsioOutputStream(SyncWriteStream& sock);
bool Write(const void* buffer, int size);
private:
SyncWriteStream& m_Socket;
};
template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
m_Socket(sock) {}
template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{
boost::system::error_code ec;
m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
return !ec;
}
Run Code Online (Sandbox Code Playgroud)
用法:
AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);
Message protoMessage;
uint32_t msg_size;
/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
// Handle error
}
/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
// Handle error
}
/* Remove limit */
cis.PopLimit(msg_limit);
Run Code Online (Sandbox Code Playgroud)
我在C++和Python中遇到了同样的问题.
对于C++版本,我使用了在此线程上发布的代码Kenton Varda以及他发送给protobuf团队的pull请求中的代码(因为此处发布的版本不会处理EOF,而他发送给github的那个).
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>
bool writeDelimitedTo(const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
// We create a new coded stream for each message. Don't worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL)
{
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
}
else
{
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError())
return false;
}
return true;
}
bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
const int start = input.CurrentPosition();
if (clean_eof)
*clean_eof = false;
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size))
{
if (clean_eof)
*clean_eof = input.CurrentPosition() == start;
return false;
}
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
Run Code Online (Sandbox Code Playgroud)
这是我的python2实现:
from google.protobuf.internal import encoder
from google.protobuf.internal import decoder
#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
mask = 0x80 # (1 << 7)
raw_varint32 = []
while 1:
b = stream.read(1)
#eof
if b == "":
break
raw_varint32.append(b)
if not (ord(b) & mask):
#we found a byte starting with a 0, which means it's the last byte of this varint
break
return raw_varint32
def writeDelimitedTo(message, stream):
message_str = message.SerializeToString()
delimiter = encoder._VarintBytes(len(message_str))
stream.write(delimiter + message_str)
def readDelimitedFrom(MessageType, stream):
raw_varint32 = readRawVarint32(stream)
message = None
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message = MessageType()
message.ParseFromString(data)
return message
#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
raw_varint32 = readRawVarint32(stream)
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message.ParseFromString(data)
return message
else:
return None
Run Code Online (Sandbox Code Playgroud)
它可能不是最好看的代码,我确信它可以被重构一下,但至少应该向您展示一种方法.
现在最大的问题是:它很慢.
即使使用python-protobuf的C++实现,它也比纯C++慢一个数量级.我有一个基准测试,我从文件中读取每个约30个字节的10M protobuf消息.它在C++中需要~0.9s,在python中需要35s.
使其快一点的一种方法是重新实现varint解码器,使其从文件中读取并一次解码,而不是从文件中读取然后按照此代码进行解码.(分析表明在varint编码器/解码器中花费了大量时间).但不用说,单靠这一点还不足以缩小python版本和C++版本之间的差距.
任何想让它变得更快的想法都非常受欢迎:)
为了完整起见,我在这里发布了一个与 protobuf 和 Python3 主版本一起使用的最新版本
对于 C++ 版本,使用 delimited_message_utils.h 中的 utils 就足够了,这里是 MWE
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/util/delimited_message_util.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
template <typename T>
bool writeManyToFile(std::deque<T> messages, std::string filename) {
int outfd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC);
google::protobuf::io::FileOutputStream fout(outfd);
bool success;
for (auto msg: messages) {
success = google::protobuf::util::SerializeDelimitedToZeroCopyStream(
msg, &fout);
if (! success) {
std::cout << "Writing Failed" << std::endl;
break;
}
}
fout.Close();
close(outfd);
return success;
}
template <typename T>
std::deque<T> readManyFromFile(std::string filename) {
int infd = open(filename.c_str(), O_RDONLY);
google::protobuf::io::FileInputStream fin(infd);
bool keep = true;
bool clean_eof = true;
std::deque<T> out;
while (keep) {
T msg;
keep = google::protobuf::util::ParseDelimitedFromZeroCopyStream(
&msg, &fin, nullptr);
if (keep)
out.push_back(msg);
}
fin.Close();
close(infd);
return out;
}
Run Code Online (Sandbox Code Playgroud)
对于Python3版本,基于@fireboot的答案,唯一需要修改的是raw_varint32的解码
def getSize(raw_varint32):
result = 0
shift = 0
b = six.indexbytes(raw_varint32, 0)
result |= ((ord(b) & 0x7f) << shift)
return result
def readDelimitedFrom(MessageType, stream):
raw_varint32 = readRawVarint32(stream)
message = None
if raw_varint32:
size = getSize(raw_varint32)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message = MessageType()
message.ParseFromString(data)
return message
Run Code Online (Sandbox Code Playgroud)