jak*_*sch 14 c# asynchronous coroutine async-await
我使用C#迭代器作为协同程序的替代品,它一直很好用.我想切换到async/await,因为我认为语法更清晰,它给了我类型安全. 在这篇(过时的)博客文章中,Jon Skeet展示了实现它的可能方式.
我选择采用略有不同的方式(通过实现我自己SynchronizationContext和使用Task.Yield).这很好.
然后我意识到会有问题; 目前协程不必完成运行.它可以在任何产生的点上优雅地停止.我们可能有这样的代码:
private IEnumerator Sleep(int milliseconds)
{
Stopwatch timer = Stopwatch.StartNew();
do
{
yield return null;
}
while (timer.ElapsedMilliseconds < milliseconds);
}
private IEnumerator CoroutineMain()
{
try
{
// Do something that runs over several frames
yield return Coroutine.Sleep(5000);
}
finally
{
Log("Coroutine finished, either after 5 seconds, or because it was stopped");
}
}
Run Code Online (Sandbox Code Playgroud)
协程通过跟踪堆栈中的所有枚举器来工作.C#编译器生成一个Dispose函数,可以调用该函数以确保正确调用'finally'块CoroutineMain,即使枚举未完成.这样我们就可以优雅地停止协程,并通过调用堆栈Dispose上的所有IEnumerator对象来确保最终调用块.这基本上是手动展开.
当我用async/await编写我的实现时,我意识到我们会失去这个功能,除非我弄错了.然后我查找了其他协同解决方案,看起来Jon Skeet的版本看起来也没有任何处理方式.
我能想到处理这个问题的唯一方法就是拥有我们自己的自定义'Yield'函数,它会检查协程是否被停止,然后引发一个表明这个的异常.这将传播,执行finally块,然后被捕获到根附近的某处.我不觉得这很漂亮,因为第三方代码可能会捕获异常.
我误解了什么,这是否可以更容易地做到?或者我需要以异常方式执行此操作吗?
编辑:已请求更多信息/代码,所以这里有一些.我可以保证这只会在一个线程上运行,所以这里没有涉及线程.我们当前的协程实现看起来有点像这样(这是简化的,但它适用于这个简单的情况):
public sealed class Coroutine : IDisposable
{
private class RoutineState
{
public RoutineState(IEnumerator enumerator)
{
Enumerator = enumerator;
}
public IEnumerator Enumerator { get; private set; }
}
private readonly Stack<RoutineState> _enumStack = new Stack<RoutineState>();
public Coroutine(IEnumerator enumerator)
{
_enumStack.Push(new RoutineState(enumerator));
}
public bool IsDisposed { get; private set; }
public void Dispose()
{
if (IsDisposed)
return;
while (_enumStack.Count > 0)
{
DisposeEnumerator(_enumStack.Pop().Enumerator);
}
IsDisposed = true;
}
public bool Resume()
{
while (true)
{
RoutineState top = _enumStack.Peek();
bool movedNext;
try
{
movedNext = top.Enumerator.MoveNext();
}
catch (Exception ex)
{
// Handle exception thrown by coroutine
throw;
}
if (!movedNext)
{
// We finished this (sub-)routine, so remove it from the stack
_enumStack.Pop();
// Clean up..
DisposeEnumerator(top.Enumerator);
if (_enumStack.Count <= 0)
{
// This was the outer routine, so coroutine is finished.
return false;
}
// Go back and execute the parent.
continue;
}
// We executed a step in this coroutine. Check if a subroutine is supposed to run..
object value = top.Enumerator.Current;
IEnumerator newEnum = value as IEnumerator;
if (newEnum != null)
{
// Our current enumerator yielded a new enumerator, which is a subroutine.
// Push our new subroutine and run the first iteration immediately
RoutineState newState = new RoutineState(newEnum);
_enumStack.Push(newState);
continue;
}
// An actual result was yielded, so we've completed an iteration/step.
return true;
}
}
private static void DisposeEnumerator(IEnumerator enumerator)
{
IDisposable disposable = enumerator as IDisposable;
if (disposable != null)
disposable.Dispose();
}
}
Run Code Online (Sandbox Code Playgroud)
假设我们有如下代码:
private IEnumerator MoveToPlayer()
{
try
{
while (!AtPlayer())
{
yield return Sleep(500); // Move towards player twice every second
CalculatePosition();
}
}
finally
{
Log("MoveTo Finally");
}
}
private IEnumerator OrbLogic()
{
try
{
yield return MoveToPlayer();
yield return MakeExplosion();
}
finally
{
Log("OrbLogic Finally");
}
}
Run Code Online (Sandbox Code Playgroud)
这可以通过将OrbLogic枚举器的实例传递给Coroutine,然后运行它来创建.这允许我们每帧勾选协程.如果玩家杀死了球,则协程未完成运行 ; 只需在协程上调用Dispose即可.如果MoveTo逻辑上在'try'块中,则在语句上调用顶部的Dispose IEnumerator将使finally块处于MoveTo执行状态.然后finally,OrbLogic中的块将执行.请注意,这是一个简单的案例,案例要复杂得多.
我正在努力在async/await版本中实现类似的行为.此版本的代码如下所示(省略错误检查):
public class Coroutine
{
private readonly CoroutineSynchronizationContext _syncContext = new CoroutineSynchronizationContext();
public Coroutine(Action action)
{
if (action == null)
throw new ArgumentNullException("action");
_syncContext.Next = new CoroutineSynchronizationContext.Continuation(state => action(), null);
}
public bool IsFinished { get { return !_syncContext.Next.HasValue; } }
public void Tick()
{
if (IsFinished)
throw new InvalidOperationException("Cannot resume Coroutine that has finished");
SynchronizationContext curContext = SynchronizationContext.Current;
try
{
SynchronizationContext.SetSynchronizationContext(_syncContext);
// Next is guaranteed to have value because of the IsFinished check
Debug.Assert(_syncContext.Next.HasValue);
// Invoke next continuation
var next = _syncContext.Next.Value;
_syncContext.Next = null;
next.Invoke();
}
finally
{
SynchronizationContext.SetSynchronizationContext(curContext);
}
}
}
public class CoroutineSynchronizationContext : SynchronizationContext
{
internal struct Continuation
{
public Continuation(SendOrPostCallback callback, object state)
{
Callback = callback;
State = state;
}
public SendOrPostCallback Callback;
public object State;
public void Invoke()
{
Callback(State);
}
}
internal Continuation? Next { get; set; }
public override void Post(SendOrPostCallback callback, object state)
{
if (callback == null)
throw new ArgumentNullException("callback");
if (Current != this)
throw new InvalidOperationException("Cannot Post to CoroutineSynchronizationContext from different thread!");
Next = new Continuation(callback, state);
}
public override void Send(SendOrPostCallback d, object state)
{
throw new NotSupportedException();
}
public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
{
throw new NotSupportedException();
}
public override SynchronizationContext CreateCopy()
{
throw new NotSupportedException();
}
}
Run Code Online (Sandbox Code Playgroud)
我没有看到如何使用它实现与迭代器版本类似的行为.为漫长的代码提前道歉!
编辑2:新方法似乎正在起作用.它允许我做以下的事情:
private static async Task Test()
{
// Second resume
await Sleep(1000);
// Unknown how many resumes
}
private static async Task Main()
{
// First resume
await Coroutine.Yield();
// Second resume
await Test();
}
Run Code Online (Sandbox Code Playgroud)
这为游戏构建AI提供了一种非常好的方式.
nos*_*tio 11
我使用C#迭代器作为协同程序的替代品,它一直很好用.我想切换到async/await,因为我认为语法更清晰,它给了我类型安全性...
IMO,这是一个非常有趣的问题,虽然我需要一段时间才能完全理解它.也许,您没有提供足够的示例代码来说明这个概念.一个完整的应用程序会有所帮助,所以我会先尝试填补这个空白.以下代码说明了我理解的使用模式,如果我错了请纠正我:
using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
// https://stackoverflow.com/q/22852251/1768303
public class Program
{
class Resource : IDisposable
{
public void Dispose()
{
Console.WriteLine("Resource.Dispose");
}
~Resource()
{
Console.WriteLine("~Resource");
}
}
private IEnumerator Sleep(int milliseconds)
{
using (var resource = new Resource())
{
Stopwatch timer = Stopwatch.StartNew();
do
{
yield return null;
}
while (timer.ElapsedMilliseconds < milliseconds);
}
}
void EnumeratorTest()
{
var enumerator = Sleep(100);
enumerator.MoveNext();
Thread.Sleep(500);
//while (e.MoveNext());
((IDisposable)enumerator).Dispose();
}
public static void Main(string[] args)
{
new Program().EnumeratorTest();
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
GC.WaitForPendingFinalizers();
Console.ReadLine();
}
}
}
Run Code Online (Sandbox Code Playgroud)
在这里,Resource.Dispose因为被调用((IDisposable)enumerator).Dispose().如果我们不调用enumerator.Dispose(),那么我们必须取消注释//while (e.MoveNext());并让迭代器优雅地完成,以便正确展开.
现在,我认为实现此功能的最佳方法async/await是使用自定义awaiter:
using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
// https://stackoverflow.com/q/22852251/1768303
public class Program
{
class Resource : IDisposable
{
public void Dispose()
{
Console.WriteLine("Resource.Dispose");
}
~Resource()
{
Console.WriteLine("~Resource");
}
}
async Task SleepAsync(int milliseconds, Awaiter awaiter)
{
using (var resource = new Resource())
{
Stopwatch timer = Stopwatch.StartNew();
do
{
await awaiter;
}
while (timer.ElapsedMilliseconds < milliseconds);
}
Console.WriteLine("Exit SleepAsync");
}
void AwaiterTest()
{
var awaiter = new Awaiter();
var task = SleepAsync(100, awaiter);
awaiter.MoveNext();
Thread.Sleep(500);
//while (awaiter.MoveNext()) ;
awaiter.Dispose();
task.Dispose();
}
public static void Main(string[] args)
{
new Program().AwaiterTest();
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
GC.WaitForPendingFinalizers();
Console.ReadLine();
}
// custom awaiter
public class Awaiter :
System.Runtime.CompilerServices.INotifyCompletion,
IDisposable
{
Action _continuation;
readonly CancellationTokenSource _cts = new CancellationTokenSource();
public Awaiter()
{
Console.WriteLine("Awaiter()");
}
~Awaiter()
{
Console.WriteLine("~Awaiter()");
}
public void Cancel()
{
_cts.Cancel();
}
// let the client observe cancellation
public CancellationToken Token { get { return _cts.Token; } }
// resume after await, called upon external event
public bool MoveNext()
{
if (_continuation == null)
return false;
var continuation = _continuation;
_continuation = null;
continuation();
return _continuation != null;
}
// custom Awaiter methods
public Awaiter GetAwaiter()
{
return this;
}
public bool IsCompleted
{
get { return false; }
}
public void GetResult()
{
this.Token.ThrowIfCancellationRequested();
}
// INotifyCompletion
public void OnCompleted(Action continuation)
{
_continuation = continuation;
}
// IDispose
public void Dispose()
{
Console.WriteLine("Awaiter.Dispose()");
if (_continuation != null)
{
Cancel();
MoveNext();
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
什么时候放松,我请求取消内部Awaiter.Dispose并驱动状态机进入下一步(如果有待处理的继续).这导致观察内部的取消Awaiter.GetResult(由编译器生成的代码调用).这引发TaskCanceledException并进一步解除了using声明.所以,Resource得到妥善处理.最后,任务转换为已取消的状态(task.IsCancelled == true).
IMO,这是一种比在当前线程上安装自定义同步上下文更简单直接的方法.它可以很容易地适应多线程(这里有更多细节).
这应该给你比用IEnumerator/ 更多的自由yield.您可以try/catch在coroutine逻辑中使用,并且可以直接通过Task对象观察异常,取消和结果.
更新后,AFAIK对于迭代器的生成没有类比IDispose,当涉及到async状态机时.当你想取消/解除它时,你真的必须驱动状态机.如果你想占有一定疏忽使用的try/catch防止被取消,我认为你能做的最好的是检查是否_continuation为非空内Awaiter.Cancel(后MoveNext),并抛出一个致命异常乱的波段(使用辅助async void方法).
更新后,这已演变为一篇博文: Asynchronous coroutines with C# 8.0 and IAsyncEnumerable。
现在是 2020 年,我关于和协程的其他答案对于await当今的 C# 语言标准来说已经过时了。C# 8.0 引入了对异步流的支持,具有以下新功能:
为了熟悉异步流的概念,我强烈建议您阅读Stephen Toub 所著的“在 C# 8 中使用异步枚举进行迭代”。
总之,这些新功能为在 C# 中以更自然的方式实现异步协同例程提供了很好的基础。
维基百科很好地解释了协程(又名 corotine)通常是什么。我想在这里展示的是协同例程如何async通过使用await和任意交换生产者/消费者的角色来暂停它们的执行流程,使用C# 8.0。
下面的代码片段应该说明这个概念。在这里,我们有两个协同例程,CoroutineA并且CoroutineB作为其伪线性执行流程进入在其上执行协同和异步,因放出到彼此。
namespace Tests
{
[TestClass]
public class CoroutineProxyTest
{
const string TRACE_CATEGORY = "coroutines";
/// <summary>
/// CoroutineA yields to CoroutineB
/// </summary>
private async IAsyncEnumerable<string> CoroutineA(
ICoroutineProxy<string> coroutineProxy,
[EnumeratorCancellation] CancellationToken token)
{
await using (var coroutine = await coroutineProxy.AsAsyncEnumerator(token))
{
const string name = "A";
var i = 0;
// yielding 1
Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
yield return $"{i} from {name}";
// receiving
if (!await coroutine.MoveNextAsync())
{
yield break;
}
Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);
// yielding 2
Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
yield return $"{i} from {name}";
// receiving
if (!await coroutine.MoveNextAsync())
{
yield break;
}
Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);
// yielding 3
Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
yield return $"{i} from {name}";
}
}
/// <summary>
/// CoroutineB yields to CoroutineA
/// </summary>
private async IAsyncEnumerable<string> CoroutineB(
ICoroutineProxy<string> coroutineProxy,
[EnumeratorCancellation] CancellationToken token)
{
await using (var coroutine = await coroutineProxy.AsAsyncEnumerator(token))
{
const string name = "B";
var i = 0;
// receiving
if (!await coroutine.MoveNextAsync())
{
yield break;
}
Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);
// yielding 1
Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
yield return $"{i} from {name}";
// receiving
if (!await coroutine.MoveNextAsync())
{
yield break;
}
Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);
// yielding 2
Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
yield return $"{i} from {name}";
// receiving
if (!await coroutine.MoveNextAsync())
{
yield break;
}
Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);
}
}
/// <summary>
/// Testing CoroutineA and CoroutineB cooperative execution
/// </summary>
[TestMethod]
public async Task Test_Coroutine_Execution_Flow()
{
// Here we execute two cotoutines, CoroutineA and CoroutineB,
// which asynchronously yield to each other
//TODO: test cancellation scenarios
var token = CancellationToken.None;
using (var apartment = new Tests.ThreadPoolApartment())
{
await apartment.Run(async () =>
{
var proxyA = new CoroutineProxy<string>();
var proxyB = new CoroutineProxy<string>();
var listener = new Tests.CategoryTraceListener(TRACE_CATEGORY);
Trace.Listeners.Add(listener);
try
{
// start both coroutines
await Task.WhenAll(
proxyA.Run(token => CoroutineA(proxyB, token), token),
proxyB.Run(token => CoroutineB(proxyA, token), token))
.WithAggregatedExceptions();
}
finally
{
Trace.Listeners.Remove(listener);
}
var traces = listener.ToArray();
Assert.AreEqual(traces[0], "A about to yeild: 1");
Assert.AreEqual(traces[1], "B received: 1 from A");
Assert.AreEqual(traces[2], "B about to yeild: 1");
Assert.AreEqual(traces[3], "A received: 1 from B");
Assert.AreEqual(traces[4], "A about to yeild: 2");
Assert.AreEqual(traces[5], "B received: 2 from A");
Assert.AreEqual(traces[6], "B about to yeild: 2");
Assert.AreEqual(traces[7], "A received: 2 from B");
Assert.AreEqual(traces[8], "A about to yeild: 3");
Assert.AreEqual(traces[9], "B received: 3 from A");
});
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
测试的输出如下所示:
协程:A 即将产生:1 协程:B 收到:1 来自 A 协程:B 即将产生:1 协程:A 收到:1 来自 B 协程:A 即将产生:2 协程:B 收到:2 来自 A 协程:B 即将产生:2 协程:A 收到:2 来自 B 协程:A 即将产生:3 协程:B 收到:来自 A 的 3 个
我目前在我的一些自动化 UI 测试场景中使用异步协程。例如,我可能有一个在 UI 线程上运行的异步测试工作流逻辑(即CouroutineA)和一个ThreadPool作为[TestMethod]方法的一部分在线程上运行的补充工作流(即CouroutineB)。
然后,我可以做这样的事情await WaitForUserInputAsync(); yield return true;,在某些关键点,同步CouroutineA和CouroutineB协作执行流程。
如果没有,yield return我将不得不使用某种形式的异步同步原语,例如 Stephen Toub 的AsyncManualResetEvent. 我个人觉得使用协程是进行这种同步的更自然的方式。
针对码CoroutineProxy(其驱动协同例程的执行)仍然是一个工作在进展。它目前使用 TPL DataflowBufferBlock作为代理队列来协调异步执行,我还不确定这是否是一种最佳方式。目前,这是它的样子:
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
#nullable enable
namespace Tests
{
public interface ICoroutineProxy<T>
{
public Task<IAsyncEnumerable<T>> AsAsyncEnumerable(CancellationToken token = default);
}
public static class CoroutineProxyExt
{
public async static Task<IAsyncEnumerator<T>> AsAsyncEnumerator<T>(
this ICoroutineProxy<T> @this,
CancellationToken token = default)
{
return (await @this.AsAsyncEnumerable(token)).GetAsyncEnumerator(token);
}
}
public class CoroutineProxy<T> : ICoroutineProxy<T>
{
readonly TaskCompletionSource<IAsyncEnumerable<T>> _proxyTcs =
new TaskCompletionSource<IAsyncEnumerable<T>>(TaskCreationOptions.RunContinuationsAsynchronously);
public CoroutineProxy()
{
}
private async IAsyncEnumerable<T> CreateProxyAsyncEnumerable(
ISourceBlock<T> bufferBlock,
[EnumeratorCancellation] CancellationToken token)
{
var completionTask = bufferBlock.Completion;
while (true)
{
var itemTask = bufferBlock.ReceiveAsync(token);
var any = await Task.WhenAny(itemTask, completionTask);
if (any == completionTask)
{
// observe completion exceptions if any
await completionTask;
yield break;
}
yield return await itemTask;
}
}
async Task<IAsyncEnumerable<T>> ICoroutineProxy<T>.AsAsyncEnumerable(CancellationToken token)
{
using (token.Register(() => _proxyTcs.TrySetCanceled(), useSynchronizationContext: true))
{
return await _proxyTcs.Task;
}
}
public async Task Run(Func<CancellationToken, IAsyncEnumerable<T>> routine, CancellationToken token)
{
token.ThrowIfCancellationRequested();
var bufferBlock = new BufferBlock<T>();
var proxy = CreateProxyAsyncEnumerable(bufferBlock, token);
_proxyTcs.SetResult(proxy); // throw if already set
try
{
//TODO: do we need to use routine(token).WithCancellation(token) ?
await foreach (var item in routine(token))
{
await bufferBlock.SendAsync(item, token);
}
bufferBlock.Complete();
}
catch (Exception ex)
{
((IDataflowBlock)bufferBlock).Fault(ex);
throw;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7022 次 |
| 最近记录: |