使用自定义流(IEnumerable <T>)

bil*_*bob 15 c# ienumerable serialization stream

我正在使用Stream将流式传输IEnumerable<T>到流中的自定义实现.我正在使用此EnumerableStream实现来执行转换.

我正在使用它在流模式下通过WCF执行流式传输.我能够IEnumerable毫无问题地将其转换为流.有一次,我在客户端,我可以反序列化并获取所有数据,但是我无法找到停止循环流的条件.我越来越:

System.Runtime.Serialization.SerializationException:解析完成之前遇到的Stream of Stream.

这是我想要实现的示例:

class Program
{
    public static void Main()
    {
        var ListToSend = new List<List<string>>();
        var ListToReceive = new List<List<string>>();
        ListToSend = SimulateData().ToList();
        using (Stream stream = GetStream(ListToSend))
        {
            var formatter = new BinaryFormatter();
            while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything???
            {
                List<string> row = formatter.Deserialize(stream) as List<string>;
                ListToReceive.Add(row);
            }
            Printer(ListToReceive);
            Console.WriteLine("Done");
        }
    }

    private static void Printer(List<List<string>> data)
    {
        Console.WriteLine("Printing");
        foreach (var row in data)
        {
            foreach (var cell in row)
            {
                Console.Write(cell + "\t");
            }
            Console.WriteLine("-------------------------------------------------------------------------------");
        }
    }
    private static Stream GetStream(IEnumerable<List<string>> data)
    {
        return EnumerableStream.Create(data, DeserializerCallback);
    }

    private static List<byte> DeserializerCallback(object obj)
    {
        var binFormatter = new BinaryFormatter();
        var mStream = new MemoryStream();
        binFormatter.Serialize(mStream, obj);
        return mStream.ToArray().ToList();
    }

    private static IEnumerable<List<string>> SimulateData()
    {
        Random randomizer = new Random();
        for (var i = 0; i < 10; i++)
        {
            var row = new List<string>();
            for (var j = 0; j < 1000; j++)
            {
                row.Add((randomizer.Next(100)).ToString());
            }
            yield return row;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我没有包含自定义流.我为那些想要查看整个代码的人创建了一个小提琴.

  • 我是否需要在自定义流本身中添加一些内容以通知所有数据已被读取?
  • 是因为反序列化器和序列化器的格式不一样(我不这么认为).
  • 我也想知道为什么当我在读取函数中设置断点时,缓冲区大小会随机变化.
  • ,停止用try和catch包装代码回答这个问题,这不是我想要的答案.我想要一个不会崩溃的干净解决方案.谢谢.

如果有人能开导我会很棒!

Iva*_*oev 7

我是否需要在自定义流本身中添加一些内容以通知所有数据已被读取?

您可以,但这对于收到的Stream是不同类的WCF场景没有帮助.

有两种标准(官方,按设计)确定Stream数据结束的方法:

(1)ReadByte返回-1

返回

无符号字节转换为Int32,如果在流的末尾则为-1.

(2)调用时读取返回0count > 0

返回

读入缓冲区的总字节数.如果当前没有多个字节可用,则这可能小于请求的字节数,如果已到达流的末尾,则可以小于零(0).

不幸的是,它们都消耗当前字节(前进到下一个)并且会破坏反序列化器.

有哪些可能的解决方案?

首先,实现一些序列化/反序列化格式(协议),它允许您知道是否有更多元素要反序列化.例如,元素之前的List<T>存储Count,元素之前的T[]存储Length等.由于EnumerableStream<T>事先不知道计数,一个简单的解决方案是在每个元素之前发出一个伪字节:

private bool SerializeNext()
{
    if (!_source.MoveNext())
        return false;

    buf.Enqueue(1); // <--
    foreach (var b in _serializer(_source.Current))
        _buf.Enqueue(b);

    return true;
}
Run Code Online (Sandbox Code Playgroud)

这将允许您使用

while (stream.ReadByte() != -1)
{
    // ...
}
Run Code Online (Sandbox Code Playgroud)

其次,如果你想保留当前格式,更通用的解决方案是实现一个自定义流,它包装另一个流并PeekByte使用与标准相同的语义实现方法ReadByte,但不消耗当前字节:

public class SequentialStream : Stream
{
    private Stream source;
    private bool leaveOpen;
    private int? nextByte;

    public SequentialStream(Stream source, bool leaveOpen = false)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));
        this.source = source;
        this.leaveOpen = leaveOpen;
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing && !leaveOpen)
            source.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override void Flush() { }
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

    public int PeekByte()
    {
        if (nextByte == null)
            nextByte = source.ReadByte();
        return nextByte.Value;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count <= 0) return 0;
        if (nextByte != null)
        {
            if (nextByte.Value < 0) return 0;
            buffer[offset] = (byte)nextByte.Value;
            if (count > 1)
            {
                int read = source.Read(buffer, offset + 1, count - 1);
                if (read == 0)
                    nextByte = -1;
                else
                    nextByte = null;
                return read + 1;
            }
            else
            {
                nextByte = null;
                return 1;
            }
        }
        else
        {
            int read = source.Read(buffer, offset, count);
            if (read == 0)
                nextByte = -1;
            return read;
        }
    }
} 
Run Code Online (Sandbox Code Playgroud)

这基本上实现了只读前向流,具有0或1字节的预读功能.

用法如下:

using (var stream = new SequentialStream(GetStream(ListToSend)))
{
    // ...
    while (stream.PeekByte() != -1) 
    {
        // ...
    }
    // ...
}
Run Code Online (Sandbox Code Playgroud)

PS怎么样?

我也想知道为什么当我在读取函数中设置断点时,缓冲区大小会随机变化.

这不是随机的.BinaryFormatter内部使用BinaryReader来读取像类型化值Int32,Byte,String等,通过所需的尺寸为count,例如,4个,1,串编码的字节数(它知道,因为将它们存储在信息流中之前实际的数据,并尝试读取之前读取它实际数据)等