Hub*_*pek 5 c# streaming wcf serialization protobuf-net
我们正在开发用于流式传输大量数据的WCF服务,因此我们选择使用WCF流功能与protobuf-net序列化相结合.
语境:
通常,一个想法是序列化服务中的对象,将它们写入流并发送.另一方面,调用者将接收Stream对象,它可以读取所有数据.
所以目前服务方法代码看起来有点像这样:
public Result TestMethod(Parameter parameter)
{
// Create response
var responseObject = new BusinessResponse { Value = "some very large data"};
// The resposne have to be serialized in advance to intermediate MemoryStream
var stream = new MemoryStream();
serializer.Serialize(stream, responseObject);
stream.Position = 0;
// ResultBody is a stream, Result is a MessageContract
return new Result {ResultBody = stream};
}
Run Code Online (Sandbox Code Playgroud)
BusinessResponse对象被序列化为MemoryStream,并从方法返回.在客户端,调用代码如下所示:
var parameter = new Parameter();
// Call the service method
var methodResult = channel.TestMethod(parameter);
// protobuf-net deserializer reads from a stream received from a service.
// while reading is performed by protobuf-net,
// on the service side WCF is actually reading from a
// memory stream where serialized message is stored
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody);
return result;
Run Code Online (Sandbox Code Playgroud)
因此,当serializer.Deserialize()调用它时,它从流中读取methodResult.ResultBody,同时在服务端WCF正在读取已从a返回的MemoryStream TestMethod.
问题:
我们想要实现的是立即摆脱MemoryStream服务端整个对象的初始序列化.由于我们使用流式传输,因此我们希望避免在发送之前将序列化对象保留在内存中.
理念:
完美的解决方案是返回一个空的,自定义的Stream对象(from TestMethod()),并引用一个要序列化的对象(在我的示例中为'BusinessResponse'对象).所以当WCF调用Read()我的流的方法时,我在内部使用protobuf-net序列化一个对象并将其返回给调用者而不将其存储在内存中.
现在有一个问题,因为我们实际需要的是在读取流时可以逐个序列化对象的可能性.我知道这是完全不同的序列化方式 - 而不是将对象推送到序列化器,我想逐件请求序列化内容.
使用protobuf-net是否可以以某种方式进行这种序列化?
我编写了一些可能与 Marc 的门思想类似的代码。
public class PullStream : Stream
{
private byte[] internalBuffer;
private bool ended;
private static ManualResetEvent dataAvailable = new ManualResetEvent(false);
private static ManualResetEvent dataEmpty = new ManualResetEvent(true);
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush()
{
throw new NotImplementedException();
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get
{
throw new NotImplementedException();
}
set
{
throw new NotImplementedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
dataAvailable.WaitOne();
if ( count >= internalBuffer.Length)
{
var retVal = internalBuffer.Length;
Array.Copy(internalBuffer, buffer, retVal);
internalBuffer = null;
dataAvailable.Reset();
dataEmpty.Set();
return retVal;
}
else
{
Array.Copy(internalBuffer, buffer, count);
internalBuffer = internalBuffer.Skip(count).ToArray(); // i know
return count;
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
dataEmpty.WaitOne();
dataEmpty.Reset();
internalBuffer = new byte[count];
Array.Copy(buffer, internalBuffer, count);
Debug.WriteLine("Writing some data");
dataAvailable.Set();
}
public void End()
{
dataEmpty.WaitOne();
dataEmpty.Reset();
internalBuffer = new byte[0];
Debug.WriteLine("Ending writes");
dataAvailable.Set();
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个简单的流后代类,仅实现读取和写入(和结束)。当没有数据可用时,读取会阻塞;当数据可用时,写入会阻塞。这样就只涉及一个字节缓冲区。其余的 linq 复制已开放以进行优化;-) 添加了 End 方法,因此在没有数据可用时执行 Read 时不会发生阻塞,并且不会再写入任何数据。
您必须从单独的线程写入此流。我在下面展示了这一点:
// create a large object
var obj = new List<ToSerialize>();
for(int i = 0; i <= 1000; i ++)
obj.Add(new ToSerialize { Test = "This is my very loooong message" });
// create my special stream to read from
var ms = new PullStream();
new Thread(x =>
{
ProtoBuf.Serializer.Serialize(ms, obj);
ms.End();
}).Start();
var buffer = new byte[100];
// stream to write back to (just to show deserialization is working too)
var ws = new MemoryStream();
int read;
while ((read = ms.Read(buffer, 0, 100)) != 0)
{
ws.Write(buffer, 0, read);
Debug.WriteLine("read some data");
}
ws.Position = 0;
var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws);
Run Code Online (Sandbox Code Playgroud)
我希望这能解决您的问题:-) 无论如何,编写这个代码很有趣。
问候, 雅科
| 归档时间: |
|
| 查看次数: |
1856 次 |
| 最近记录: |