IEnumerable到Stream

Joe*_*oel 13 c# asp.net-mvc stream

我想做一些大致相当于下面代码示例的事情.我想生成并提供数据流,而不必在任何时候将整个数据集都放在内存中.

看起来我需要一些在其构造函数中接受IEnumerable<string>(或IEnumerable<byte>)的Stream实现.在内部,此Stream只会在读取Stream或根据需要时移动IEnumerable.但我不知道这样的任何Stream实现.

我是在正确的轨道上吗?你知道怎么办这样的事吗?

    public FileStreamResult GetResult()
    {
        IEnumerable<string> data = GetDataForStream();

        Stream dataStream = ToStringStream(Encoding.UTF8, data);

        return File(dataStream, "text/plain", "Result");
    }

    private IEnumerable<string> GetDataForStream()
    {
        StringBuilder sb;
        for (int i = 0; i < 10000; i++)
        {
            yield return i.ToString();
            yield return "\r\n";
        }
    }

    private Stream ToStringStream(Encoding encoding, IEnumerable<string> data)
    {
        // I have to write my own implementation of stream?
        throw new NotImplementedException();
    }
Run Code Online (Sandbox Code Playgroud)

Jim*_*hel 5

我创建了一个名为ProducerConsumerStream做到这一点的类。生产者将数据写入流,而消费者读取。中间有一个缓冲区,以便生产者可以“提前写”一点。您可以定义缓冲区的大小。

无论如何,如果它不正是你要寻找的,我怀疑它会给你它是如何做一个好主意。请参阅构建新型流

更新资料

链接已过时,因此我已在此处复制了代码。原始文章仍可在Wayback机器上找到,网址https://web.archive.org/web/20151210235510/http://www.informit.com/guides/content.aspx?g=dotnet&seqNum=852

一,ProducerConsumerStream班级:

using System;
using System.IO;
using System.Threading;
using System.Diagnostics;

namespace Mischel.IO
{
    // This class is safe for 1 producer and 1 consumer.
    public class ProducerConsumerStream : Stream
    {
        private byte[] CircleBuff;
        private int Head;
        private int Tail;

        public bool IsAddingCompleted { get; private set; }
        public bool IsCompleted { get; private set; }

        // For debugging
        private long TotalBytesRead = 0;
        private long TotalBytesWritten = 0;

        public ProducerConsumerStream(int size)
        {
            CircleBuff = new byte[size];
            Head = 1;
            Tail = 0;
        }

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string msg)
        {
            Console.WriteLine(msg);
        }

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string fmt, params object[] parms)
        {
            DebugOut(string.Format(fmt, parms));
        }

        private int ReadBytesAvailable
        {
            get
            {
                if (Head > Tail)
                    return Head - Tail - 1;
                else
                    return CircleBuff.Length - Tail + Head - 1;
            }
        }

        private int WriteBytesAvailable { get { return CircleBuff.Length - ReadBytesAvailable - 1; } }

        private void IncrementTail()
        {
            Tail = (Tail + 1) % CircleBuff.Length;
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            if (IsCompleted)
            {
                throw new EndOfStreamException("The stream is empty and has been marked complete for adding.");
            }
            if (count == 0)
            {
                return 0;
            }

            lock (CircleBuff)
            {
                DebugOut("Read: requested {0:N0} bytes. Available = {1:N0}.", count, ReadBytesAvailable);
                while (ReadBytesAvailable == 0)
                {
                    if (IsAddingCompleted)
                    {
                        IsCompleted = true;
                        return 0;
                    }
                    Monitor.Wait(CircleBuff);
                }

                // If Head < Tail, then there are bytes available at the end of the buffer
                // and also at the front of the buffer.
                // If reading from Tail to the end doesn't fulfill the request,
                // and there are still bytes available,
                // then read from the start of the buffer.
                DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable);

                IncrementTail();
                int bytesToRead;
                if (Tail > Head)
                {
                    // When Tail > Head, we know that there are at least
                    // (CircleBuff.Length - Tail) bytes available in the buffer.
                    bytesToRead = CircleBuff.Length - Tail;
                }
                else
                {
                    bytesToRead = Head - Tail;
                }

                // Don't read more than count bytes!
                bytesToRead = Math.Min(bytesToRead, count);

                Buffer.BlockCopy(CircleBuff, Tail, buffer, offset, bytesToRead);
                Tail += (bytesToRead - 1);
                int bytesRead = bytesToRead;

                // At this point, either we've exhausted the buffer,
                // or Tail is at the end of the buffer and has to wrap around.
                if (bytesRead < count && ReadBytesAvailable > 0)
                {
                    // We haven't fulfilled the read.
                    IncrementTail();
                    // Tail is always equal to 0 here.
                    bytesToRead = Math.Min((count - bytesRead), (Head - Tail));
                    Buffer.BlockCopy(CircleBuff, Tail, buffer, offset + bytesRead, bytesToRead);
                    bytesRead += bytesToRead;
                    Tail += (bytesToRead - 1);
                }

                TotalBytesRead += bytesRead;
                DebugOut("Read: returning {0:N0} bytes. TotalRead={1:N0}", bytesRead, TotalBytesRead);
                DebugOut("Read: Head={0}, Tail={1}, Avail={2}", Head, Tail, ReadBytesAvailable);

                Monitor.Pulse(CircleBuff);
                return bytesRead;
            }
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            if (IsAddingCompleted)
            {
                throw new InvalidOperationException("The stream has been marked as complete for adding.");
            }
            lock (CircleBuff)
            {
                DebugOut("Write: requested {0:N0} bytes. Available = {1:N0}", count, WriteBytesAvailable);
                int bytesWritten = 0;
                while (bytesWritten < count)
                {
                    while (WriteBytesAvailable == 0)
                    {
                        Monitor.Wait(CircleBuff);
                    }
                    DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable);
                    int bytesToCopy = Math.Min((count - bytesWritten), WriteBytesAvailable);
                    CopyBytes(buffer, offset + bytesWritten, bytesToCopy);
                    TotalBytesWritten += bytesToCopy;
                    DebugOut("Write: {0} bytes written. TotalWritten={1:N0}", bytesToCopy, TotalBytesWritten);
                    DebugOut("Write: Head={0}, Tail={1}, Avail={2}", Head, Tail, WriteBytesAvailable);
                    bytesWritten += bytesToCopy;
                    Monitor.Pulse(CircleBuff);
                }
            }
        }


        private void CopyBytes(byte[] buffer, int srcOffset, int count)
        {
            // Insert at head
            // The copy might require two separate operations.

            // copy as much as can fit between Head and end of the circular buffer
            int offset = srcOffset;
            int bytesCopied = 0;
            int bytesToCopy = Math.Min(CircleBuff.Length - Head, count);
            if (bytesToCopy > 0)
            {
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                bytesCopied = bytesToCopy;
                Head = (Head + bytesToCopy) % CircleBuff.Length;
                offset += bytesCopied;
            }

            // Copy the remainder, which will go from the beginning of the buffer.
            if (bytesCopied < count)
            {
                bytesToCopy = count - bytesCopied;
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                Head = (Head + bytesToCopy) % CircleBuff.Length;
            }
        }

        public void CompleteAdding()
        {
            if (disposed)
            {
                throw new ObjectDisposedException("The stream has been disposed.");
            }
            lock (CircleBuff)
            {
                DebugOut("CompleteAdding: {0:N0} bytes written.", TotalBytesWritten);
                IsAddingCompleted = true;
                Monitor.Pulse(CircleBuff);
            }
        }

        public override bool CanRead { get { return true; } }

        public override bool CanSeek { get { return false; } }

        public override bool CanWrite { get { return true; } }

        public override void Flush() { /* does nothing */ }

        public override long Length { get { throw new NotImplementedException(); } }

        public override long Position
        {
            get { throw new NotImplementedException(); }
            set { throw new NotImplementedException(); }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        private bool disposed = false;

        protected override void Dispose(bool disposing)
        {
            if (!disposed)
            {
                base.Dispose(disposing);
                disposed = true;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

以及如何使用它的示例:

class Program
{
    static readonly string TestText = "This is a test of the emergency broadcast system.";
    static readonly byte[] TextBytes = Encoding.UTF8.GetBytes(TestText);

    const int Megabyte = 1024 * 1024;

    const int TestBufferSize = 12;

    const int ProducerBufferSize = 4;
    const int ConsumerBufferSize = 5;

    static void Main(string[] args)
    {
        Console.WriteLine("TextBytes contains {0:N0} bytes.", TextBytes.Length);
        using (var pcStream = new ProducerConsumerStream(TestBufferSize))
        {
            Thread ProducerThread = new Thread(ProducerThreadProc);
            Thread ConsumerThread = new Thread(ConsumerThreadProc);
            ProducerThread.Start(pcStream);
            Thread.Sleep(2000);
            ConsumerThread.Start(pcStream);

            ProducerThread.Join();
            ConsumerThread.Join();
        }
        Console.Write("Done. Press Enter.");
        Console.ReadLine();
    }

    static void ProducerThreadProc(object state)
    {
        Console.WriteLine("Producer: started.");
        var pcStream = (ProducerConsumerStream)state;
        int offset = 0;
        while (offset < TestText.Length)
        {
            int bytesToWrite = Math.Min(ProducerBufferSize, TestText.Length - offset);
            pcStream.Write(TextBytes, offset, bytesToWrite);
            offset += bytesToWrite;
        }
        pcStream.CompleteAdding();
        Console.WriteLine("Producer: {0:N0} total bytes written.", offset);
        Console.WriteLine("Producer: exit.");
    }

    static void ConsumerThreadProc(object state)
    {
        Console.WriteLine("Consumer: started.");
        var instream = (ProducerConsumerStream)state;
        int testOffset = 0;

        var inputBuffer = new byte[TextBytes.Length];

        int bytesRead;
        do
        {
            int bytesToRead = Math.Min(ConsumerBufferSize, inputBuffer.Length - testOffset);
            bytesRead = instream.Read(inputBuffer, testOffset, bytesToRead);
            //Console.WriteLine("Consumer: {0:N0} bytes read.", bytesRead);
            testOffset += bytesRead;
        } while (bytesRead != 0);
        Console.WriteLine("Consumer: {0:N0} total bytes read.", testOffset);

        // Compare bytes read with TextBytes
        for (int i = 0; i < TextBytes.Length; ++i)
        {
            if (inputBuffer[i] != TextBytes[i])
            {
                Console.WriteLine("Read error at position {0}", i);
                break;
            }
        }
        Console.WriteLine("Consumer: exit.");
    }
}
Run Code Online (Sandbox Code Playgroud)


Sve*_*ckx 5

这是一个Stream使用 aIEnumerable<byte>作为输入的只读实现:

public class ByteStream : Stream, IDisposable
{
    private readonly IEnumerator<byte> _input;
    private bool _disposed;

    public ByteStream(IEnumerable<byte> input)
    {
        _input = input.GetEnumerator();
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => 0;
    public override long Position { get; set; } = 0;

    public override int Read(byte[] buffer, int offset, int count)
    {
        int i = 0;
        for (; i < count && _input.MoveNext(); i++)
            buffer[i + offset] = _input.Current;
        return i;
    }

    public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
    public override void SetLength(long value) => throw new InvalidOperationException();
    public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
    public override void Flush() => throw new InvalidOperationException();

    void IDisposable.Dispose()
    {
        if (_disposed)
            return;
        _input.Dispose();
        _disposed=  true;
    }
}
Run Code Online (Sandbox Code Playgroud)

那么,什么你仍然需要的是一个转换功能IEnumerable<string>IEnumerable<byte>

public static IEnumerable<byte> Encode(IEnumerable<string> input, Encoding encoding)
{
    byte[] newLine = encoding.GetBytes(Environment.NewLine);
    foreach (string line in input)
    {
        byte[] bytes = encoding.GetBytes(line);
        foreach (byte b in bytes)
            yield return b;
        foreach (byte b in newLine)
            yield return b;
    }
}
Run Code Online (Sandbox Code Playgroud)

最后,这是在控制器中使用它的方法:

public FileResult GetResult()
{
    IEnumerable<string> data = GetDataForStream();
    var stream = new ByteStream(Encode(data, Encoding.UTF8));
    return File(stream, "text/plain", "Result.txt");
}
Run Code Online (Sandbox Code Playgroud)