有一个lib将其结果输出到给定的Stream对象中.我想在lib完成之前开始使用结果.Stream应该是阻塞的,以简化使用并避免过多的内存消耗,如果生产者提前跑得太远; 线程安全,允许生产者和消费者的独立存在.
一旦lib完成,生产者线程应关闭流,从而通知消费者没有更多数据.
我正在考虑使用NetworkStream或PipeStream(匿名),但两者都可能因为通过内核发送数据而变慢.
任何推荐?
var stream = new AsyncBlockingBufferedStream();
void ProduceData()
{
// In producer thread
externalLib.GenerateData(stream);
stream.Close();
}
void ConsumeData()
{
// In consumer thread
int read;
while ((read = stream.Read(...)) != 0)
{ ... }
}
Yur*_*rik 15
根据Chris Taylor先前的回答,这是我自己的修订版,具有更快的基于块的操作和更正的写入完成通知.它现在标记为维基,所以你可以改变它.
public class BlockingStream : Stream
{
private readonly BlockingCollection<byte[]> _blocks;
private byte[] _currentBlock;
private int _currentBlockIndex;
public BlockingStream(int streamWriteCountCache)
{
_blocks = new BlockingCollection<byte[]>(streamWriteCountCache);
}
public override bool CanTimeout { get { return false; } }
public override bool CanRead { get { return true; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return true; } }
public override long Length { get { throw new NotSupportedException(); } }
public override void Flush() {}
public long TotalBytesWritten { get; private set; }
public int WriteCount { get; private set; }
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
ValidateBufferArgs(buffer, offset, count);
int bytesRead = 0;
while (true)
{
if (_currentBlock != null)
{
int copy = Math.Min(count - bytesRead, _currentBlock.Length - _currentBlockIndex);
Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset + bytesRead, copy);
_currentBlockIndex += copy;
bytesRead += copy;
if (_currentBlock.Length <= _currentBlockIndex)
{
_currentBlock = null;
_currentBlockIndex = 0;
}
if (bytesRead == count)
return bytesRead;
}
if (!_blocks.TryTake(out _currentBlock, Timeout.Infinite))
return bytesRead;
}
}
public override void Write(byte[] buffer, int offset, int count)
{
ValidateBufferArgs(buffer, offset, count);
var newBuf = new byte[count];
Array.Copy(buffer, offset, newBuf, 0, count);
_blocks.Add(newBuf);
TotalBytesWritten += count;
WriteCount++;
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
_blocks.Dispose();
}
}
public override void Close()
{
CompleteWriting();
base.Close();
}
public void CompleteWriting()
{
_blocks.CompleteAdding();
}
private static void ValidateBufferArgs(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentOutOfRangeException("offset");
if (count < 0)
throw new ArgumentOutOfRangeException("count");
if (buffer.Length - offset < count)
throw new ArgumentException("buffer.Length - offset < count");
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4624 次 |
最近记录: |