在C#/ .NET中为producer/cosumer实现异步流

Yur*_*rik 9 .net c# stream

有一个lib将其结果输出到给定的Stream对象中.我想在lib完成之前开始使用结果.Stream应该是阻塞的,以简化使用并避免过多的内存消耗,如果生产者提前跑得太远; 线程安全,允许生产者和消费者的独立存在.

一旦lib完成,生产者线程应关闭流,从而通知消费者没有更多数据.

我正在考虑使用NetworkStream或PipeStream(匿名),但两者都可能因为通过内核发送数据而变慢.

任何推荐?


var stream = new AsyncBlockingBufferedStream();

void ProduceData() { // In producer thread externalLib.GenerateData(stream); stream.Close(); }

void ConsumeData() { // In consumer thread int read; while ((read = stream.Read(...)) != 0) { ... } }

Yur*_*rik 15

根据Chris Taylor先前的回答,这是我自己的修订版,具有更快的基于块的操作和更正的写入完成通知.它现在标记为维基,所以你可以改变它.

public class BlockingStream : Stream
{
    private readonly BlockingCollection<byte[]> _blocks;
    private byte[] _currentBlock;
    private int _currentBlockIndex;

    public BlockingStream(int streamWriteCountCache)
    {
        _blocks = new BlockingCollection<byte[]>(streamWriteCountCache);
    }

    public override bool CanTimeout { get { return false; } }
    public override bool CanRead { get { return true; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return true; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override void Flush() {}
    public long TotalBytesWritten { get; private set; }
    public int WriteCount { get; private set; }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        ValidateBufferArgs(buffer, offset, count);

        int bytesRead = 0;
        while (true)
        {
            if (_currentBlock != null)
            {
                int copy = Math.Min(count - bytesRead, _currentBlock.Length - _currentBlockIndex);
                Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset + bytesRead, copy);
                _currentBlockIndex += copy;
                bytesRead += copy;

                if (_currentBlock.Length <= _currentBlockIndex)
                {
                    _currentBlock = null;
                    _currentBlockIndex = 0;
                }

                if (bytesRead == count)
                    return bytesRead;
            }

            if (!_blocks.TryTake(out _currentBlock, Timeout.Infinite))
                return bytesRead;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        ValidateBufferArgs(buffer, offset, count);

        var newBuf = new byte[count];
        Array.Copy(buffer, offset, newBuf, 0, count);
        _blocks.Add(newBuf);
        TotalBytesWritten += count;
        WriteCount++;
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        if (disposing)
        {
            _blocks.Dispose();
        }
    }

    public override void Close()
    {
        CompleteWriting();
        base.Close();
    }

    public void CompleteWriting()
    {
        _blocks.CompleteAdding();
    }

    private static void ValidateBufferArgs(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (offset < 0)
            throw new ArgumentOutOfRangeException("offset");
        if (count < 0)
            throw new ArgumentOutOfRangeException("count");
        if (buffer.Length - offset < count)
            throw new ArgumentException("buffer.Length - offset < count");
    }
}
Run Code Online (Sandbox Code Playgroud)