hul*_*ist 26 c# multithreading asynchronous task-parallel-library async-await
我发现TaskCompletionSource.SetResult();在返回之前调用等待任务的代码.在我的情况下,导致死锁.
这是一个在普通版本中启动的简化版本 Thread
void ReceiverRun()
while (true)
{
var msg = ReadNextMessage();
TaskCompletionSource<Response> task = requests[msg.RequestID];
if(msg.Error == null)
task.SetResult(msg);
else
task.SetException(new Exception(msg.Error));
}
}
Run Code Online (Sandbox Code Playgroud)
代码的"异步"部分看起来像这样.
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Run Code Online (Sandbox Code Playgroud)
Wait实际上嵌套在非异步调用中.
SendAwaitResponse(简化)
public static Task<Response> SendAwaitResponse(string msg)
{
var t = new TaskCompletionSource<Response>();
requests.Add(GetID(msg), t);
stream.Write(msg);
return t.Task;
}
Run Code Online (Sandbox Code Playgroud)
我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它会在为ReceiverRun创建的线程中继续.
无论如何设置任务的结果而不继续等待代码?
应用程序是一个控制台应用程序.
Ste*_*ary 28
我发现了TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码.在我的情况下,导致死锁.
是的,我有一篇博客文章记录了这一点(AFAIK没有记录在MSDN上).死锁发生的原因有两个:
async和阻塞代码(即一个async方法正在调用Wait).TaskContinuationOptions.ExecuteSynchronously.我建议从最简单的解决方案开始:删除第一件事(1).即,不要混合async和Wait调用:
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Run Code Online (Sandbox Code Playgroud)
相反,await始终使用:
await SendAwaitResponse("first message");
await SendAwaitResponse("second message");
Run Code Online (Sandbox Code Playgroud)
如果需要,你可以Wait在一个替代点进一步调用堆栈(未在async法).
这是我最推荐的解决方案.但是,如果你想尝试删除第二个东西(2),你可以做一些技巧:要么包装SetResult在一个Task.Run强制它到一个单独的线程(我的AsyncEx库有*WithBackgroundContinuations扩展方法,这样做),或给你的线程是一个实际的上下文(比如我的AsyncContext类型)并指定ConfigureAwait(false),这将导致继续忽略该ExecuteSynchronously标志.
但是这些解决方案要比分离async和阻塞代码复杂得多.
作为旁注,请看一下TPL Dataflow ; 听起来你可能觉得它很有用.
由于您的应用程序是一个控制台应用程序,它在默认同步上下文中运行,其中await将在等待任务已完成的同一线程上调用延续回调.如果你想在之后切换线程await SendAwaitResponse,你可以这样做await Task.Yield():
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
Run Code Online (Sandbox Code Playgroud)
您可以通过存储Thread.CurrentThread.ManagedThreadId内部Task.Result并将其与当前线程的ID进行比较来进一步改进await.如果你仍然在同一个线程上,那就做await Task.Yield().
虽然我知道这SendAwaitResponse是你的实际代码的简化版本,但它仍然完全同步(你在问题中展示它的方式).你为什么期望有任何线程切换?
无论如何,您可能应该重新设计逻辑,而不是假设您当前使用的是什么线程.避免混合await和Task.Wait(),使所有代码异步的.通常,可以Wait()在顶层(例如内部Main)的某个地方坚持使用.
将帖子调用task.SetResult(msg)从ReceiverRun实际控制流向点转移,你await的task-没有一个线程切换,因为默认的同步环境的行为.因此,执行实际消息处理的代码正在接管ReceiverRun线程.最终,SendAwaitResponse("second message").Wait()在同一个线程上调用,导致死锁.
下面是一个控制台应用程序代码,模仿您的示例.它使用await Task.Yield()inside ProcessAsync来在一个单独的线程上调度延续,因此控制流返回ReceiverRun并且没有死锁.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
Run Code Online (Sandbox Code Playgroud)
这与做Task.Run(() => task.SetResult(msg))内部没什么不同ReceiverRun.我能想到的唯一优势是你可以明确控制何时切换线程.通过这种方式,你可以留在同一个线程,只要可能的(例如,对于task2,task3,task4,但你仍然需要另一个线程切换后task4,以避免死锁task5.Wait()).
这两种解决方案最终都会使线程池增长,这在性能和可伸缩性方面都很糟糕.
现在,如果我们在上面的代码中替换task.Wait()为await task里面ProcessAsync的任何地方,我们将不必使用,await Task.Yield并且仍然没有死锁.但是,await第一个await task1内部之后的整个调用链ProcessAsync实际上将在该ReceiverRun线程上执行.只要我们不用其他Wait()样式的调用来阻止这个线程,并且在我们处理消息时不做大量的CPU绑定工作,这种方法可能正常工作(异步IO绑定await式调用仍然应该是好的,它们实际上可能会触发隐式线程切换).
也就是说,我认为您需要一个单独的线程,其上安装了序列化同步上下文来处理消息(类似于WindowsFormsSynchronizationContext).这就是你的异步代码awaits应该运行的地方.您仍然需要避免Task.Wait在该线程上使用.如果单个消息处理需要大量CPU限制工作,那么您应该使用Task.Run此类工作.对于异步IO绑定调用,您可以保持在同一个线程上.
您可能希望查看ActionDispatcher/ ActionDispatcherSynchronizationContext从@StephenCleary的
Nito异步库查找异步消息处理逻辑.希望斯蒂芬跳进来并提供更好的答案.
var tcs = new TaskCompletionSource<Response>(TaskCreationOptions.RunContinuationsAsynchronously);当您想要强制添加到当前任务的延续异步执行时使用。
| 归档时间: |
|
| 查看次数: |
12007 次 |
| 最近记录: |