具有取消令牌的NetworkStream.ReadAsync永远不会取消

Sof*_*ion 30 .net sockets async-await .net-4.5 cancellation-token

这里的证明.
知道这段代码有什么问题吗?

    [TestMethod]
    public void TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        bool ok = Read(tcp.GetStream()).Wait(30000);
        Assert.IsTrue(ok);
    }

    async Task Read(NetworkStream stream)
    {
        using (var cancellationTokenSource = new CancellationTokenSource(5000))
        {
            int receivedCount;
            try
            {
                var buffer = new byte[1000];
                receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
            }
            catch (TimeoutException e)
            {
                receivedCount = -1;
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

Sof*_*ion 27

我终于找到了解决方法.使用Task.WaitAny将异步调用与延迟任务(Task.Delay)组合在一起.在io任务之前经过延迟时,关闭流.这将迫使任务停止.您应该正确处理io任务上的异步异常.您应该为延迟任务和io任务添加延续任务.

它也适用于TCP连接.在另一个线程中关闭连接(您可以认为它是延迟任务线程)强制所有使用/等待此连接的异步任务停止.

- 编辑 -

@vtortola建议的另一个更清洁的解决方案:使用取消令牌注册对stream.Close的调用:

async Task Read(NetworkStream stream)
{
    using (var cancellationTokenSource = new CancellationTokenSource(5000))
    {
        using(cancellationTokenSource.Token.Register(() => stream.Close()))
        {
            int receivedCount;
            try
            {
                var buffer = new byte[1000];
                receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
            }
            catch (TimeoutException e)
            {
                receivedCount = -1;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 我发现的其他解决方案是在取消令牌的"Register"方法中注册一个对"Close"的调用.因此,当取消令牌被取消时,它会自动调用`Socket.Close`方法. (8认同)
  • 一点注意:在您的示例中,您在方法中创建了令牌。通常,您将令牌作为参数获取,然后您需要在“使用”中包含对 Register 的调用,以确保该调用在离开该方法后未注册,否则每次都会注册该调用:) (2认同)

usr*_*usr 18

取消是合作的.NetworkStream.ReadAsync必须合作才能取消.它很难做到这一点,因为这可能会使流处于未定义状态.从Windows TCP堆栈中读取了哪些字节,哪些没有?IO不容易取消.

反射器显示NetworkStream不会覆盖ReadAsync.这意味着它将获得Stream.ReadAsync仅抛出令牌的默认行为.没有通用的方法可以取消Stream操作,所以BCL Stream类甚至没有尝试(它不能尝试 - 没有办法做到这一点).

你应该设置一个超时Socket.

  • 太糟糕了,我认为网络操作是取消可能很有意义的情况之一,因为它们可能需要很长时间. (6认同)
  • 是。我希望BCL对所有常见的IO操作都支持`CancelIO`。它可以和`CancellationToken`很好地集成在一起。 (2认同)
  • 该文档说,使用异步方法时,将在Socket类上忽略所有超时“超时:此选项仅适用于同步接收调用。”。因此,所有.net异步网络方法都不可用,因为它们可能导致无限锁定...。 (2认同)

Ste*_*ary 5

那里出现了一些问题:

  1. CancellationTokenthrows OperationCanceledException, not TimeoutException(取消并不总是由于超时)。
  2. ReceiveTimeout不适用,因为您正在进行异步读取。即使确实如此,您也会在IOException和之间出现竞争条件OperationCanceledException
  3. 由于您是同步连接套接字,因此您需要在此测试中设置较高的超时(IIRC,默认连接超时约为 90 秒,但可以在 Windows 监视网络速度时进行更改)。
  4. 测试异步代码的正确方法是使用异步测试:

    [TestMethod]
    public async Task TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        await Read(tcp.GetStream());
    }
    
    Run Code Online (Sandbox Code Playgroud)


Ans*_*sss 5

根据 Softlion 的回答中的描述:

使用 Task.WaitAny 将异步调用与延迟任务 (Task.Delay) 结合起来。当延迟在 io 任务之前过去时,关闭流。这将强制任务停止。您应该正确处理 io 任务上的异步异常。并且您应该为 dealy 任务和 io 任务添加一个延续任务。

我已经制作了一些代码,可以为您提供超时的异步读取:

using System;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace ConsoleApplication2013
{
    class Program
    {
        /// <summary>
        /// Does an async read on the supplied NetworkStream and will timeout after the specified milliseconds.
        /// </summary>
        /// <param name="ns">NetworkStream object on which to do the ReadAsync</param>
        /// <param name="s">Socket associated with ns (needed to close to abort the ReadAsync task if the timeout occurs)</param>
        /// <param name="timeoutMillis">number of milliseconds to wait for the read to complete before timing out</param>
        /// <param name="buffer"> The buffer to write the data into</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream</param>
        /// <param name="amountToRead">The maximum number of bytes to read</param>
        /// <returns>
        /// a Tuple where Item1 is true if the ReadAsync completed, and false if the timeout occurred,
        /// and Item2 is set to the amount of data that was read when Item1 is true
        /// </returns>
        public static async Task<Tuple<bool, int>> ReadWithTimeoutAsync(NetworkStream ns, Socket s, int timeoutMillis, byte[] buffer, int offset, int amountToRead)
        {
            Task<int> readTask = ns.ReadAsync(buffer, offset, amountToRead);
            Task timeoutTask = Task.Delay(timeoutMillis);

            int amountRead = 0;

            bool result = await Task.Factory.ContinueWhenAny<bool>(new Task[] { readTask, timeoutTask }, (completedTask) =>
            {
                if (completedTask == timeoutTask) //the timeout task was the first to complete
                {
                    //close the socket (unless you set ownsSocket parameter to true in the NetworkStream constructor, closing the network stream alone was not enough to cause the readTask to get an exception)
                    s.Close();
                    return false; //indicate that a timeout occurred
                }
                else //the readTask completed
                {
                    amountRead = readTask.Result;
                    return true;
                }
            });

            return new Tuple<bool, int>(result, amountRead);
        }

        #region sample usage
        static void Main(string[] args)
        {
            Program p = new Program();
            Task.WaitAll(p.RunAsync());
        }

        public async Task RunAsync()
        {
            Socket s = new Socket(SocketType.Stream, ProtocolType.Tcp);

            Console.WriteLine("Connecting...");
            s.Connect("127.0.0.1", 7894);  //for a simple server to test the timeout, run "ncat -l 127.0.0.1 7894"
            Console.WriteLine("Connected!");

            NetworkStream ns = new NetworkStream(s);

            byte[] buffer = new byte[1024];
            Task<Tuple<bool, int>> readWithTimeoutTask = Program.ReadWithTimeoutAsync(ns, s, 3000, buffer, 0, 1024);
            Console.WriteLine("Read task created");

            Tuple<bool, int> result = await readWithTimeoutTask;

            Console.WriteLine("readWithTimeoutTask is complete!");
            Console.WriteLine("Read succeeded without timeout? " + result.Item1 + ";  Amount read=" + result.Item2);
        }
        #endregion
    }
}
Run Code Online (Sandbox Code Playgroud)