Jef*_*ger 12 .net c# task-parallel-library async-await
它建议使用一个ConfigureAwait(false)当你可以随时,特别是在图书馆,因为它可以帮助避免死锁和提高性能.
我编写了一个大量使用异步的库(访问数据库的Web服务).图书馆的用户遇到了僵局,经过多次痛苦的调试和修补后,我将其追踪到了单独使用await Task.Yield().我使用过的其他任何地方.ConfigureAwait(false),但是不支持Task.Yield().
对于需要等效的情况,推荐的解决方案是Task.Yield().ConfigureAwait(false)什么?
我已经读过有关如何SwitchTo移除某个方法的内容.我可以看出为什么那可能是危险的,但为什么没有相应的Task.Yield().ConfigureAwait(false)?
编辑:
为了提供我的问题的进一步背景,这里有一些代码.我正在实现一个开源库,用于访问支持异步的DynamoDB(作为AWS的服务的分布式数据库).IX-Async库IAsyncEnumerable<T>提供了许多操作返回.该库不提供从以"块"提供行的数据源生成异步枚举的好方法,即每个异步请求返回许多项.所以我有自己的通用类型.该库支持预读选项,允许用户指定在调用实际需要之前应该请求多少数据.MoveNext()
基本上,这是如何工作的,我通过调用GetMore()和传递这些块之间的状态来请求块.我将这些任务放入chunks队列并将它们出列并将它们转换为我放在单独队列中的实际结果.这个NextChunk()方法就是问题所在.根据值的不同,ReadAhead我会在完成最后一个(最多)之后保持获取下一个块,直到需要一个值但不可用(无)或仅获取超出当前使用值的下一个块(一些).因此,获取下一个块应该并行/不阻止获取下一个值.枚举器代码是:
private class ChunkedAsyncEnumerator<TState, TResult> : IAsyncEnumerator<TResult>
{
private readonly ChunkedAsyncEnumerable<TState, TResult> enumerable;
private readonly ConcurrentQueue<Task<TState>> chunks = new ConcurrentQueue<Task<TState>>();
private readonly Queue<TResult> results = new Queue<TResult>();
private CancellationTokenSource cts = new CancellationTokenSource();
private TState lastState;
private TResult current;
private bool complete; // whether we have reached the end
public ChunkedAsyncEnumerator(ChunkedAsyncEnumerable<TState, TResult> enumerable, TState initialState)
{
this.enumerable = enumerable;
lastState = initialState;
if(enumerable.ReadAhead != ReadAhead.None)
chunks.Enqueue(NextChunk(initialState));
}
private async Task<TState> NextChunk(TState state, CancellationToken? cancellationToken = null)
{
await Task.Yield(); // ** causes deadlock
var nextState = await enumerable.GetMore(state, cancellationToken ?? cts.Token).ConfigureAwait(false);
if(enumerable.ReadAhead == ReadAhead.All && !enumerable.IsComplete(nextState))
chunks.Enqueue(NextChunk(nextState)); // This is a read ahead, so it shouldn't be tied to our token
return nextState;
}
public Task<bool> MoveNext(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if(results.Count > 0)
{
current = results.Dequeue();
return TaskConstants.True;
}
return complete ? TaskConstants.False : MoveNextAsync(cancellationToken);
}
private async Task<bool> MoveNextAsync(CancellationToken cancellationToken)
{
Task<TState> nextStateTask;
if(chunks.TryDequeue(out nextStateTask))
lastState = await nextStateTask.WithCancellation(cancellationToken).ConfigureAwait(false);
else
lastState = await NextChunk(lastState, cancellationToken).ConfigureAwait(false);
complete = enumerable.IsComplete(lastState);
foreach(var result in enumerable.GetResults(lastState))
results.Enqueue(result);
if(!complete && enumerable.ReadAhead == ReadAhead.Some)
chunks.Enqueue(NextChunk(lastState)); // This is a read ahead, so it shouldn't be tied to our token
return await MoveNext(cancellationToken).ConfigureAwait(false);
}
public TResult Current { get { return current; } }
// Dispose() implementation omitted
}
Run Code Online (Sandbox Code Playgroud)
我没有声称这段代码是完美的.对不起它太长了,不知道如何简化.重要的部分是NextChunk方法和呼吁Task.Yield().此功能通过静态构造方法使用:
internal static class AsyncEnumerableEx
{
public static IAsyncEnumerable<TResult> GenerateChunked<TState, TResult>(
TState initialState,
Func<TState, CancellationToken, Task<TState>> getMore,
Func<TState, IEnumerable<TResult>> getResults,
Func<TState, bool> isComplete,
ReadAhead readAhead = ReadAhead.None)
{ ... }
}
Run Code Online (Sandbox Code Playgroud)
确切的等价物Task.Yield().ConfigureAwait(false)(因为ConfigureAwait是一个方法Task并且Task.Yield返回一个自定义的等待而不存在)只是使用Task.Factory.StartNewwith CancellationToken.None,TaskCreationOptions.PreferFairness和TaskScheduler.Current.但是,在大多数情况下Task.Run(使用默认值TaskScheduler)足够接近.
您可以通过查看源来验证它,YieldAwaiter并查看它使用ThreadPool.QueueUserWorkItem/ ThreadPool.UnsafeQueueUserWorkItemwhen TaskScheduler.Current是默认值(即线程池),Task.Factory.StartNew何时不是.
然而,您可以创建自己的等待(就像我做的那样)模仿,YieldAwaitable但无视SynchronizationContext:
async Task Run(int input)
{
await new NoContextYieldAwaitable();
// executed on a ThreadPool thread
}
public struct NoContextYieldAwaitable
{
public NoContextYieldAwaiter GetAwaiter() { return new NoContextYieldAwaiter(); }
public struct NoContextYieldAwaiter : INotifyCompletion
{
public bool IsCompleted { get { return false; } }
public void OnCompleted(Action continuation)
{
var scheduler = TaskScheduler.Current;
if (scheduler == TaskScheduler.Default)
{
ThreadPool.QueueUserWorkItem(RunAction, continuation);
}
else
{
Task.Factory.StartNew(continuation, CancellationToken.None, TaskCreationOptions.PreferFairness, scheduler);
}
}
public void GetResult() { }
private static void RunAction(object state) { ((Action)state)(); }
}
}
Run Code Online (Sandbox Code Playgroud)
注意:我不建议实际使用NoContextYieldAwaitable,这只是你问题的答案.你应该使用Task.Run(或Task.Factory.StartNew具体TaskScheduler)
我注意到你在接受现有答案后编辑了你的问题,所以也许你对这个问题的更多咆哮感兴趣.干得好 :)
建议您随时使用ConfigureAwait(false),尤其是在库中,因为它可以帮助避免死锁并提高性能.
建议这样做,只有当您完全确定您在实现中调用的任何API(包括Framework API)不依赖于同步上下文的任何属性时.这对于库代码尤为重要,如果库适合客户端和服务器端使用,则更是如此.例如,这CurrentCulture是一个常见的忽视:它对于桌面应用程序永远不会是一个问题,但它可能适用于ASP.NET应用程序.
回到你的代码:
private async Task<TState> NextChunk(...)
{
await Task.Yield(); // ** causes deadlock
var nextState = await enumerable.GetMore(...);
// ...
return nextState;
}
Run Code Online (Sandbox Code Playgroud)
最有可能的僵局是由库的客户端造成的,因为他们使用Task.Result(或Task.Wait,Task.WaitAll,Task.IAsyncResult.AsyncWaitHandle等,让他们搜索)在调用链的外框地方.虽然Task.Yield()这里是多余的,但这首先不是你的问题,而是他们的问题:它们不应该在异步API上阻塞,而应该使用"Async All as Way",正如Stephen Cleary在你链接的文章中所解释的那样.
删除Task.Yield() 可能会或可能不会解决此问题,因为enumerable.GetMore()也可以使用一些await SomeApiAsync()没有ConfigureAwait(false),从而将延续发布回调用者的同步上下文.此外," SomeApiAsync"可能恰好是一个完善的Framework API,它仍然容易陷入僵局,比如SendMailAsync,我们稍后会再回过头来看.
总的来说,你应该只是Task.Yield()因为某种原因想要立即返回调用者(将执行控制"返回"给调用者),然后异步地继续,SynchronizationContext由调用线程上安装的(或者ThreadPool,如果SynchronizationContext.Current == null).在app的核心消息循环的下一次迭代时,可以在同一线程上执行continuation well .更多细节可以在这里找到:
所以,正确的做法是避免一直阻塞代码.但是,您仍然希望使代码具有死锁功能,您不关心同步上下文,并且您确定在实现中使用的任何系统或第三方API都是如此.
然后,ThreadPoolEx.SwitchTo您可以使用,而不是重新发明(这是有充分理由删除)Task.Run,如评论中所示:
private Task<TState> NextChunk(...)
{
// jump to a pool thread without SC to avoid deadlocks
return Task.Run(async() =>
{
var nextState = await enumerable.GetMore(...);
// ...
return nextState;
});
}
Run Code Online (Sandbox Code Playgroud)
IMO,这仍然是一个黑客,具有相同的净效应,虽然比使用变体更易读ThreadPoolEx.SwitchTo().同样SwitchTo,它仍然有相关的成本:冗余线程切换可能会损害ASP.NET性能.
还有另一个(IMO更好)hack,我在这里提出解决上述问题的僵局SendMailAsync.它不会产生额外的线程切换:
private Task<TState> NextChunk(...)
{
return TaskExt.WithNoContext(async() =>
{
var nextState = await enumerable.GetMore(...);
// ...
return nextState;
});
}
public static class TaskExt
{
public static Task<TResult> WithNoContext<TResult>(Func<Task<TResult>> func)
{
Task<TResult> task;
var sc = SynchronizationContext.Current;
try
{
SynchronizationContext.SetSynchronizationContext(null);
task = func(); // do not await here
}
finally
{
SynchronizationContext.SetSynchronizationContext(sc);
}
return task;
}
}
Run Code Online (Sandbox Code Playgroud)
这个hack工作的方式是暂时删除原始NextChunk方法的同步范围的同步上下文,因此不会捕获lambda中的第一个awaitcontinuation async,从而有效地解决了死锁问题.
Stephen 在回答相同问题时提供了略微不同的实现.他恢复了原来的同步上下文,无论发生在继续的线程之后(可能是一个完全不同的随机池线程).只要我不关心它,我宁愿不恢复它.IgnoreSynchronizationContextawaitawait
| 归档时间: |
|
| 查看次数: |
1920 次 |
| 最近记录: |