以非阻塞方式调用TaskCompletionSource.SetResult

hul*_*ist 26 c# multithreading asynchronous task-parallel-library async-await

我发现TaskCompletionSource.SetResult();在返回之前调用等待任务的代码.在我的情况下,导致死锁.

这是一个在普通版本中启动的简化版本 Thread

void ReceiverRun()
    while (true)
    {
        var msg = ReadNextMessage();
        TaskCompletionSource<Response> task = requests[msg.RequestID];

        if(msg.Error == null)
            task.SetResult(msg);
        else
            task.SetException(new Exception(msg.Error));
    }
}
Run Code Online (Sandbox Code Playgroud)

代码的"异步"部分看起来像这样.

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Run Code Online (Sandbox Code Playgroud)

Wait实际上嵌套在非异步调用中.

SendAwaitResponse(简化)

public static Task<Response> SendAwaitResponse(string msg)
{
    var t = new TaskCompletionSource<Response>();
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.Task;
}
Run Code Online (Sandbox Code Playgroud)

我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它会在为ReceiverRun创建的线程中继续.

无论如何设置任务的结果而不继续等待代码?

应用程序是一个控制台应用程序.

Ste*_*ary 28

我发现了TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码.在我的情况下,导致死锁.

是的,我有一篇博客文章记录了这一点(AFAIK没有记录在MSDN上).死锁发生的原因有两个:

  1. 有一个混合async和阻塞代码(即一个async方法正在调用Wait).
  2. 使用安排任务延续TaskContinuationOptions.ExecuteSynchronously.

我建议从最简单的解决方案开始:删除第一件事(1).即,不要混合asyncWait调用:

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Run Code Online (Sandbox Code Playgroud)

相反,await始终使用:

await SendAwaitResponse("first message");
await SendAwaitResponse("second message");
Run Code Online (Sandbox Code Playgroud)

如果需要,你可以Wait在一个替代点进一步调用堆栈(async法).

这是我最推荐的解决方案.但是,如果你想尝试删除第二个东西(2),你可以做一些技巧:要么包装SetResult在一个Task.Run强制它到一个单独的线程(我的AsyncEx库*WithBackgroundContinuations扩展方法,这样做),或给你的线程是一个实际的上下文(比如我的AsyncContext类型)并指定ConfigureAwait(false),这将导致继续忽略该ExecuteSynchronously标志.

但是这些解决方案要比分离async和阻塞代码复杂得多.

作为旁注,请看一下TPL Dataflow ; 听起来你可能觉得它很有用.

  • 现在还有[TaskCreationOptions.RunContinuationsAsynchronously](https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskcreationoptions(v = vs.110).aspx #idMembers)方法,它可以让你指定(在TaskCompletionSource构造函数中)应该异步执行continuation.但它只是.NET 4.6+. (8认同)
  • SetResult 应该只是将任务标记为完成,并且只要线程池线程可用,它的延续就应该运行。如果 SetResult 阻塞等待发生,则应将其作为严重错误归档。 (3认同)
  • @Triynko:7年前我[将其报告为错误](https://blog.stephencleary.com/2012/12/dont-block-in-asynchronous-code.html)。IIRC,它被关闭为“按设计”。 (2认同)

nos*_*tio 5

由于您的应用程序是一个控制台应用程序,它在默认同步上下文中运行,其中await将在等待任务已完成的同一线程上调用延续回调.如果你想在之后切换线程await SendAwaitResponse,你可以这样做await Task.Yield():

await SendAwaitResponse("first message");
await Task.Yield(); 
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
Run Code Online (Sandbox Code Playgroud)

您可以通过存储Thread.CurrentThread.ManagedThreadId内部Task.Result并将其与当前线程的ID进行比较来进一步改进await.如果你仍然在同一个线程上,那就做await Task.Yield().

虽然我知道这SendAwaitResponse是你的实际代码的简化版本,但它仍然完全同步(你在问题中展示它的方式).你为什么期望有任何线程切换?

无论如何,您可能应该重新设计逻辑,而不是假设您当前使用的是什么线程.避免混合awaitTask.Wait(),使所有代码异步的.通常,可以Wait()在顶层(例如内部Main)的某个地方坚持使用.

将帖子调用task.SetResult(msg)ReceiverRun实际控制流向点转移,你awaittask-没有一个线程切换,因为默认的同步环境的行为.因此,执行实际消息处理的代码正在接管ReceiverRun线程.最终,SendAwaitResponse("second message").Wait()在同一个线程上调用,导致死锁.

下面是一个控制台应用程序代码,模仿您的示例.它使用await Task.Yield()inside ProcessAsync来在一个单独的线程上调度延续,因此控制流返回ReceiverRun并且没有死锁.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        class Worker
        {
            public struct Response
            {
                public string message;
                public int threadId;
            }

            CancellationToken _token;
            readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
            readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();

            public Worker(CancellationToken token)
            {
                _token = token;
            }

            string ReadNextMessage()
            {
                // using Thread.Sleep(100) for test purposes here,
                // should be using ManualResetEvent (or similar synchronization primitive),
                // depending on how messages arrive
                string message;
                while (!_messages.TryDequeue(out message))
                {
                    Thread.Sleep(100);
                    _token.ThrowIfCancellationRequested();
                }
                return message;
            }

            public void ReceiverRun()
            {
                LogThread("Enter ReceiverRun");
                while (true)
                {
                    var msg = ReadNextMessage();
                    LogThread("ReadNextMessage: " + msg);
                    var tcs = _requests[msg];
                    tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
                    _token.ThrowIfCancellationRequested(); // this is how we terminate the loop
                }
            }

            Task<Response> SendAwaitResponse(string msg)
            {
                LogThread("SendAwaitResponse: " + msg);
                var tcs = new TaskCompletionSource<Response>();
                _requests.TryAdd(msg, tcs);
                _messages.Enqueue(msg);
                return tcs.Task;
            }

            public async Task ProcessAsync()
            {
                LogThread("Enter Worker.ProcessAsync");

                var task1 = SendAwaitResponse("first message");
                await task1;
                LogThread("result1: " + task1.Result.message);
                // avoid deadlock for task2.Wait() with Task.Yield()
                // comment this out and task2.Wait() will dead-lock
                if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task2 = SendAwaitResponse("second message");
                task2.Wait();
                LogThread("result2: " + task2.Result.message);

                var task3 = SendAwaitResponse("third message");
                // still on the same thread as with result 2, no deadlock for task3.Wait()
                task3.Wait();
                LogThread("result3: " + task3.Result.message);

                var task4 = SendAwaitResponse("fourth message");
                await task4;
                LogThread("result4: " + task4.Result.message);
                // avoid deadlock for task5.Wait() with Task.Yield()
                // comment this out and task5.Wait() will dead-lock
                if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task5 = SendAwaitResponse("fifth message");
                task5.Wait();
                LogThread("result5: " + task5.Result.message);

                LogThread("Leave Worker.ProcessAsync");
            }

            public static void LogThread(string message)
            {
                Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
            }
        }

        static void Main(string[] args)
        {
            Worker.LogThread("Enter Main");
            var cts = new CancellationTokenSource(5000); // cancel after 5s
            var worker = new Worker(cts.Token);
            Task receiver = Task.Run(() => worker.ReceiverRun());
            Task main = worker.ProcessAsync();
            try
            {
                Task.WaitAll(main, receiver);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
            Worker.LogThread("Leave Main");
            Console.ReadLine();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这与做Task.Run(() => task.SetResult(msg))内部没什么不同ReceiverRun.我能想到的唯一优势是你可以明确控制何时切换线程.通过这种方式,你可以留在同一个线程,只要可能的(例如,对于task2,task3,task4,但你仍然需要另一个线程切换后task4,以避免死锁task5.Wait()).

这两种解决方案最终都会使线程池增长,这在性能和可伸缩性方面都很糟糕.

现在,如果我们在上面的代码中替换task.Wait()await task里面ProcessAsync的任何地方,我们将不必使用,await Task.Yield并且仍然没有死锁.但是,await第一个await task1内部之后的整个调用链ProcessAsync实际上将在该ReceiverRun线程上执行.只要我们不用其他Wait()样式的调用来阻止这个线程,并且在我们处理消息时不做大量的CPU绑定工作,这种方法可能正常工作(异步IO绑定await式调用仍然应该是好的,它们实际上可能会触发隐式线程切换).

也就是说,我认为您需要一个单独的线程,其上安装了序列化同步上下文来处理消息(类似于WindowsFormsSynchronizationContext).这就是你的异步代码awaits应该运行的地方.您仍然需要避免Task.Wait在该线程上使用.如果单个消息处理需要大量CPU限制工作,那么您应该使用Task.Run此类工作.对于异步IO绑定调用,您可以保持在同一个线程上.

您可能希望查看ActionDispatcher/ ActionDispatcherSynchronizationContext@StephenClearyNito异步库查找异步消息处理逻辑.希望斯蒂芬跳进来并提供更好的答案.


Tse*_*fan 1

var tcs = new TaskCompletionSource<Response>(TaskCreationOptions.RunContinuationsAsynchronously);当您想要强制添加到当前任务的延续异步执行时使用。