.NET异步流读/写

Xpi*_*itO 47 .net c# concurrency asynchronous

我一直在尝试解决这个"并发编程"考试练习(在C#中):

知道Stream该类包含int Read(byte[] buffer, int offset, int size)void Write(byte[] buffer, int offset, int size)方法,在C#中实现NetToFile将从NetworkStream net实例接收的所有数据复制到FileStream file实例的方法.要进行传输,请使用异步读取和同步写入,以避免在读取操作期间阻塞一个线程.当net读取操作返回值0 时,传输结束.为简化起见,不必支持操作的受控取消.

void NetToFile(NetworkStream net, FileStream file);
Run Code Online (Sandbox Code Playgroud)

我一直试图解决这个问题,但我正在努力解决与问题本身相关的问题.但首先,这是我的代码:

public static void NetToFile(NetworkStream net, FileStream file) {
    byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
    int offset = 0; // read/write offset
    int nBytesRead = 0; // number of bytes read on each cycle

    IAsyncResult ar;
    do {
        // read partial content of net (asynchronously)
        ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
        // wait until read is completed
        ar.AsyncWaitHandle.WaitOne();
        // get number of bytes read on each cycle
        nBytesRead = net.EndRead(ar);

        // write partial content to file (synchronously)
        fs.Write(buffer,offset,nBytesRead);
        // update offset
        offset += nBytesRead;
    }
    while( nBytesRead > 0);
}
Run Code Online (Sandbox Code Playgroud)

我的问题是在问题陈述中说:

要进行传输,请使用异步读取和同步写入,以避免在读取操作期间阻塞一个线程

我不确定我的解决方案是否能完成本练习中所需的内容,因为我正在AsyncWaitHandle.WaitOne()等待异步读取完成.

另一方面,我并没有真正弄清楚在这种情况下什么是"非阻塞"解决方案,因为FileStream写意图是同步进行的......为此,我必须等到NetworkStream阅读完成继续FileStream写作,不是吗?

请你帮我解决这个问题吗?


[编辑1] 使用回调解决方案

好吧,如果我理解了Mitchel Sellerswillvv的回复,我会被建议使用回调方法将其转变为"非阻塞"解决方案.这是我的代码,然后:

byte[] buffer; // buffer

public static void NetToFile(NetworkStream net, FileStream file) {
    // buffer with same dimension as file stream data
    buffer = new byte[file.Length];
    //start asynchronous read
    net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}

//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
    //NetworkStream retrieve
    NetworkStream net = (NetworkStream) ar.IAsyncState;
    //get number of bytes read
    int nBytesRead = net.EndRead(ar);

    //write content to file
    //... and now, how do I write to FileStream instance without
    //having its reference??
    //fs.Write(buffer,0,nBytesRead);
}
Run Code Online (Sandbox Code Playgroud)

你可能已经注意到了,我坚持使用回调方法,因为我没有引用FileStream我要调用"Write(...)"方法的实例.

此外,这不是一个线程安全的解决方案,因为该byte[]字段是公开的,并且可以在并发NetToFile调用之间共享.我不知道如何byte[]在外部范围内暴露这个字段来解决这个问题......而且我几乎可以肯定它可能不会以这种方式暴露出来.

我不想使用lambda或匿名方法解决方案,因为这不在"并发编程"课程的课程中.

Nic*_*rey 52

尽管这种做法不利于帮助人们完成家庭作业,但考虑到这已经超过一年了,这是实现这一目标的正确方法.所有你需要重叠你的读/写操作 - 不产生额外的线程,或任何其他需要.

public static class StreamExtensions
{
    private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
    public static void CopyTo( this Stream input , Stream output )
    {
        input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
        return ;
    }
    public static void CopyTo( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException(   "input must be open for reading"  );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 }                                       ;
        int          bufno = 0 ;
        IAsyncResult read  = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
        IAsyncResult write = null ;

        while ( true )
        {

            // wait for the read operation to complete
            read.AsyncWaitHandle.WaitOne() ; 
            bufl[bufno] = input.EndRead(read) ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break ;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                write.AsyncWaitHandle.WaitOne() ;
                output.EndWrite(write) ;
            }

            // start the new write operation
            write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            write.AsyncWaitHandle.WaitOne() ;
            output.EndWrite(write) ;
        }

        output.Flush() ;

        // return to the caller ;
        return ;
    }


    public static async Task CopyToAsync( this Stream input , Stream output )
    {
        await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
        return;
    }

    public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 } ;
        int          bufno = 0 ;
        Task<int>    read  = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
        Task         write = null ;

        while ( true )
        {

            await read ;
            bufl[bufno] = read.Result ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                await write ;
            }

            // start the new write operation
            write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            await write;
        }

        output.Flush();

        // return to the caller ;
        return;
    }

}
Run Code Online (Sandbox Code Playgroud)

干杯.

  • 不需要@Nicholas WaitOne().MSDN:"此方法[EndWrite]阻塞,直到I/O操作完成" (8认同)
  • 似乎不需要所有AsyncWaitHandle.WaitOne().我的测试显示async没有帮助,这是我的结果:// file - > localhost network // 1T数据(32K缓冲区)// async:104 109 108 100 111 108 109 MB/s // sync:95 98 111 124 104 117 117 MB/s (2认同)
  • 因为这是单线程代码。使读取和写入都“异步”的原因是它们可以“重叠”:当我们等待块 *n+1* 的读取完成时,块 *n* 的写入操作正在进行中。请随意添加您自己的解决方案。 (2认同)

Joh*_*ren 18

我怀疑这是最快的代码(.NET任务抽象有一些开销),但我认为这是一个更简洁的方法来处理整个异步复制的事情.

我需要一个CopyTransformAsync可以传递委托来执行某些操作的地方,因为块通过了复制操作.例如,在复制时计算消息摘要.这就是为什么我有兴趣推出自己的选择.

发现:

  • CopyToAsync bufferSize是敏感的(需要大缓冲区)
  • FileOptions.Asynchronous - >让它变得非常慢(不确定为什么会这样)
  • FileStream对象的bufferSize可以更小(它并不重要)
  • Serial测试显然是最快且资源最密集的

这是我发现的以及我用来测试它的程序的完整源代码.在我的机器上,这些测试是在SSD磁盘上运行的,相当于文件副本.通常情况下,你不想使用它来复制文件,而是当你有一个网络流(这是我的用例),那就是你想要使用这样的东西.

4K buffer

Serial...                                in 0.474s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    timed out
CopyTransformAsync (Asynchronous)...     timed out

8K buffer

Serial...                                in 0.344s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 1.116s
CopyTransformAsync (Asynchronous)...     timed out

40K buffer

Serial...                                in 0.195s
CopyToAsync...                           in 0.624s
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 0.378s
CopyTransformAsync (Asynchronous)...     timed out

80K buffer

Serial...                                in 0.190s
CopyToAsync...                           in 0.355s
CopyToAsync (Asynchronous)...            in 1.196s
CopyTransformAsync...                    in 0.300s
CopyTransformAsync (Asynchronous)...     in 0.886s

160K buffer

Serial...                                in 0.432s
CopyToAsync...                           in 0.252s
CopyToAsync (Asynchronous)...            in 0.454s
CopyTransformAsync...                    in 0.447s
CopyTransformAsync (Asynchronous)...     in 0.555s
Run Code Online (Sandbox Code Playgroud)

在这里,您可以在运行测试时看到Process Explorer,性能图.基本上每个顶部(在三个图中较低的一个)是串行测试的开始.随着缓冲区大小的增加,您可以清楚地看到吞吐量如何显着增加.它似乎计划在80K左右,这是.NET框架CopyToAsync方法在内部使用的.

性能图

这里的好处是最终的实现并不复杂:

static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
    , Stream outputStream
    , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
    )
{
    var temp = new byte[bufferSize];
    var temp2 = new byte[bufferSize];

    int i = 0;

    var readTask = inputStream
        .ReadAsync(temp, 0, bufferSize)
        .ConfigureAwait(false);

    var writeTask = CompletedTask.ConfigureAwait(false);

    for (; ; )
    {
        // synchronize read
        int read = await readTask;
        if (read == 0)
        {
            break;
        }

        if (i++ > 0)
        {
            // synchronize write
            await writeTask;
        }

        var chunk = new ArraySegment<byte>(temp, 0, read);

        // do transform (if any)
        if (!(transform == null))
        {
            chunk = transform(chunk);
        }

        // queue write
        writeTask = outputStream
            .WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
            .ConfigureAwait(false);

        // queue read
        readTask = inputStream
            .ReadAsync(temp2, 0, bufferSize)
            .ConfigureAwait(false);

        // swap buffer
        var temp3 = temp;
        temp = temp2;
        temp2 = temp3;
    }

    await writeTask; // complete any lingering write task
}
Run Code Online (Sandbox Code Playgroud)

尽管有大量缓冲区,这种交错读/写方法的速度比BCL快18%CopyToAsync.

出于好奇,我确实将异步调用更改为典型的开始/结束异步模式调用,并且没有改善一点情况,这使情况变得更糟.对于我喜欢抨击任务抽象开销的所有内容,当您使用async/await关键字编写代码时,它们会做一些漂亮的事情,并且读取该代码会更好!

  • 我当然意识到在 S0 上不鼓励说谢谢。然而,作为 C# 的初学者,您对“应该做的事情”或至少“可以做的事情”的演示,尤其是您随答案提供的测试代码,对于初学者程序员(如我自己)很有帮助。提高我们的技能。先生,向您致敬(授予的分数还不够!)。 (3认同)

ben*_*wey 12

您将需要使用NetStream读取中的回调来处理此问题.坦率地说,将复制逻辑包装到自己的类中可能更容易,这样您就可以维护活动Streams的实例.

这就是我接近它的方式(未经测试):

public class Assignment1
{
    public static void NetToFile(NetworkStream net, FileStream file) 
    {
        var copier = new AsyncStreamCopier(net, file);
        copier.Start();
    }

    public static void NetToFile_Option2(NetworkStream net, FileStream file) 
    {
        var completedEvent = new ManualResetEvent(false);

        // copy as usual but listen for completion
        var copier = new AsyncStreamCopier(net, file);
        copier.Completed += (s, e) => completedEvent.Set();
        copier.Start();

        completedEvent.WaitOne();
    }

    /// <summary>
    /// The Async Copier class reads the input Stream Async and writes Synchronously
    /// </summary>
    public class AsyncStreamCopier
    {
        public event EventHandler Completed;

        private readonly Stream input;
        private readonly Stream output;

        private byte[] buffer = new byte[4096];

        public AsyncStreamCopier(Stream input, Stream output)
        {
            this.input = input;
            this.output = output;
        }

        public void Start()
        {
            GetNextChunk();
        }

        private void GetNextChunk()
        {
            input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
        }

        private void InputReadComplete(IAsyncResult ar)
        {
            // input read asynchronously completed
            int bytesRead = input.EndRead(ar);

            if (bytesRead == 0)
            {
                RaiseCompleted();
                return;
            }

            // write synchronously
            output.Write(buffer, 0, bytesRead);

            // get next
            GetNextChunk();
        }

        private void RaiseCompleted()
        {
            if (Completed != null)
            {
                Completed(this, EventArgs.Empty);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


Ken*_*nzi 11

哇,这些都非常复杂!这是我的异步解决方案,它只是一个功能.Read()和BeginWrite()同时运行.

/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
///  and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
    // This stream copy supports a source-read happening at the same time
    // as target-write.  A simpler implementation would be to use just
    // Write() instead of BeginWrite(), at the cost of speed.

    byte[] readbuffer = new byte[4096];
    byte[] writebuffer = new byte[4096];
    IAsyncResult asyncResult = null;

    for (; ; )
    {
        // Read data into the readbuffer.  The previous call to BeginWrite, if any,
        //  is executing in the background..
        int read = source.Read(readbuffer, 0, readbuffer.Length);

        // Ok, we have read some data and we're ready to write it, so wait here
        //  to make sure that the previous write is done before we write again.
        if (asyncResult != null)
        {
            // This should work down to ~0.01kb/sec
            asyncResult.AsyncWaitHandle.WaitOne(60000);
            target.EndWrite(asyncResult); // Last step to the 'write'.
            if (!asyncResult.IsCompleted) // Make sure the write really completed.
                throw new IOException("Stream write failed.");
        }

        if (read <= 0)
            return; // source stream says we're done - nothing else to read.

        // Swap the read and write buffers so we can write what we read, and we can
        //  use the then use the other buffer for our next read.
        byte[] tbuf = writebuffer;
        writebuffer = readbuffer;
        readbuffer = tbuf;

        // Asynchronously write the data, asyncResult.AsyncWaitHandle will
        // be set when done.
        asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
    }
}
Run Code Online (Sandbox Code Playgroud)


Shr*_*ike 9

很奇怪,没有人提到TPL.
是PFX团队(Stephen Toub)关于如何实现并发异步流副本的非常好的帖子.该帖子包含过时refenrece到样品所以这里是一个corrent:
获取从code.msdn并行扩展额外

var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();
Run Code Online (Sandbox Code Playgroud)

还可以考虑使用J.Richer的AsyncEnumerator.