异步/等待作为协同程序的替代品

jak*_*sch 14 c# asynchronous coroutine async-await

我使用C#迭代器作为协同程序的替代品,它一直很好用.我想切换到async/await,因为我认为语法更清晰,它给了我类型安全. 在这篇(过时的)博客文章中,Jon Skeet展示了实现它的可能方式.

我选择采用略有不同的方式(通过实现我自己SynchronizationContext和使用Task.Yield).这很好.

然后我意识到会有问题; 目前协程不必完成运行.它可以在任何产生的点上优雅地停止.我们可能有这样的代码:

private IEnumerator Sleep(int milliseconds)
{
    Stopwatch timer = Stopwatch.StartNew();
    do
    {
        yield return null;
    }
    while (timer.ElapsedMilliseconds < milliseconds);
}

private IEnumerator CoroutineMain()
{
    try
    {
        // Do something that runs over several frames
        yield return Coroutine.Sleep(5000);
    }
    finally
    {
        Log("Coroutine finished, either after 5 seconds, or because it was stopped");
    }
}
Run Code Online (Sandbox Code Playgroud)

协程通过跟踪堆栈中的所有枚举器来工作.C#编译器生成一个Dispose函数,可以调用该函数以确保正确调用'finally'块CoroutineMain,即使枚举未完成.这样我们就可以优雅地停止协程,并通过调用堆栈Dispose上的所有IEnumerator对象来确保最终调用块.这基本上是手动展开.

当我用async/await编写我的实现时,我意识到我们会失去这个功能,除非我弄错了.然后我查找了其他协同解决方案,看起来Jon Skeet的版本看起来也没有任何处理方式.

我能想到处理这个问题的唯一方法就是拥有我们自己的自定义'Yield'函数,它会检查协程是否被停止,然后引发一个表明这个的异常.这将传播,执行finally块,然后被捕获到根附近的某处.我不觉得这很漂亮,因为第三方代码可能会捕获异常.

我误解了什么,这是否可以更容易地做到?或者我需要以异常方式执行此操作吗?

编辑:已请求更多信息/代码,所以这里有一些.我可以保证这只会在一个线程上运行,所以这里没有涉及线程.我们当前的协程实现看起来有点像这样(这是简化的,但它适用于这个简单的情况):

public sealed class Coroutine : IDisposable
{
    private class RoutineState
    {
        public RoutineState(IEnumerator enumerator)
        {
            Enumerator = enumerator;
        }

        public IEnumerator Enumerator { get; private set; }
    }

    private readonly Stack<RoutineState> _enumStack = new Stack<RoutineState>();

    public Coroutine(IEnumerator enumerator)
    {
        _enumStack.Push(new RoutineState(enumerator));
    }

    public bool IsDisposed { get; private set; }

    public void Dispose()
    {
        if (IsDisposed)
            return;

        while (_enumStack.Count > 0)
        {
            DisposeEnumerator(_enumStack.Pop().Enumerator);
        }

        IsDisposed = true;
    }

    public bool Resume()
    {
        while (true)
        {
            RoutineState top = _enumStack.Peek();
            bool movedNext;

            try
            {
                movedNext = top.Enumerator.MoveNext();
            }
            catch (Exception ex)
            {
                // Handle exception thrown by coroutine
                throw;
            }

            if (!movedNext)
            {
                // We finished this (sub-)routine, so remove it from the stack
                _enumStack.Pop();

                // Clean up..
                DisposeEnumerator(top.Enumerator);


                if (_enumStack.Count <= 0)
                {
                    // This was the outer routine, so coroutine is finished.
                    return false;
                }

                // Go back and execute the parent.
                continue;
            }

            // We executed a step in this coroutine. Check if a subroutine is supposed to run..
            object value = top.Enumerator.Current;
            IEnumerator newEnum = value as IEnumerator;
            if (newEnum != null)
            {
                // Our current enumerator yielded a new enumerator, which is a subroutine.
                // Push our new subroutine and run the first iteration immediately
                RoutineState newState = new RoutineState(newEnum);
                _enumStack.Push(newState);

                continue;
            }

            // An actual result was yielded, so we've completed an iteration/step.
            return true;
        }
    }

    private static void DisposeEnumerator(IEnumerator enumerator)
    {
        IDisposable disposable = enumerator as IDisposable;
        if (disposable != null)
            disposable.Dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)

假设我们有如下代码:

private IEnumerator MoveToPlayer()
{
  try
  {
    while (!AtPlayer())
    {
      yield return Sleep(500); // Move towards player twice every second
      CalculatePosition();
    }
  }
  finally
  {
    Log("MoveTo Finally");
  }
}

private IEnumerator OrbLogic()
{
  try
  {
    yield return MoveToPlayer();
    yield return MakeExplosion();
  }
  finally
  {
    Log("OrbLogic Finally");
  }
}
Run Code Online (Sandbox Code Playgroud)

这可以通过将OrbLogic枚举器的实例传递给Coroutine,然后运行它来创建.这允许我们每帧勾选协程.如果玩家杀死了球,则协程未完成运行 ; 只需在协程上调用Dispose即可.如果MoveTo逻辑上在'try'块中,则在语句上调用顶部的Dispose IEnumerator将使finally块处于MoveTo执行状态.然后finally,OrbLogic中的块将执行.请注意,这是一个简单的案例,案例要复杂得多.

我正在努力在async/await版本中实现类似的行为.此版本的代码如下所示(省略错误检查):

public class Coroutine
{
    private readonly CoroutineSynchronizationContext _syncContext = new CoroutineSynchronizationContext();

    public Coroutine(Action action)
    {
        if (action == null)
            throw new ArgumentNullException("action");

        _syncContext.Next = new CoroutineSynchronizationContext.Continuation(state => action(), null);
    }

    public bool IsFinished { get { return !_syncContext.Next.HasValue; } }

    public void Tick()
    {
        if (IsFinished)
            throw new InvalidOperationException("Cannot resume Coroutine that has finished");

        SynchronizationContext curContext = SynchronizationContext.Current;
        try
        {
            SynchronizationContext.SetSynchronizationContext(_syncContext);

            // Next is guaranteed to have value because of the IsFinished check
            Debug.Assert(_syncContext.Next.HasValue);

            // Invoke next continuation
            var next = _syncContext.Next.Value;
            _syncContext.Next = null;

            next.Invoke();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(curContext);
        }
    }
}

public class CoroutineSynchronizationContext : SynchronizationContext
{
    internal struct Continuation
    {
        public Continuation(SendOrPostCallback callback, object state)
        {
            Callback = callback;
            State = state;
        }

        public SendOrPostCallback Callback;
        public object State;

        public void Invoke()
        {
            Callback(State);
        }
    }

    internal Continuation? Next { get; set; }

    public override void Post(SendOrPostCallback callback, object state)
    {
        if (callback == null)
            throw new ArgumentNullException("callback");

        if (Current != this)
            throw new InvalidOperationException("Cannot Post to CoroutineSynchronizationContext from different thread!");

        Next = new Continuation(callback, state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        throw new NotSupportedException();
    }

    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
    {
        throw new NotSupportedException();
    }

    public override SynchronizationContext CreateCopy()
    {
        throw new NotSupportedException();
    }
}
Run Code Online (Sandbox Code Playgroud)

我没有看到如何使用它实现与迭代器版本类似的行为.为漫长的代码提前道歉!

编辑2:新方法似乎正在起作用.它允许我做以下的事情:

private static async Task Test()
{
    // Second resume
    await Sleep(1000);
    // Unknown how many resumes
}

private static async Task Main()
{
    // First resume
    await Coroutine.Yield();
    // Second resume
    await Test();
}
Run Code Online (Sandbox Code Playgroud)

这为游戏构建AI提供了一种非常好的方式.

nos*_*tio 11

我使用C#迭代器作为协同程序的替代品,它一直很好用.我想切换到async/await,因为我认为语法更清晰,它给了我类型安全性...

IMO,这是一个非常有趣的问题,虽然我需要一段时间才能完全理解它.也许,您没有提供足够的示例代码来说明这个概念.一个完整的应用程序会有所帮助,所以我会先尝试填补这个空白.以下代码说明了我理解的使用模式,如果我错了请纠正我:

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303

    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        private IEnumerator Sleep(int milliseconds)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    yield return null;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
        }

        void EnumeratorTest()
        {
            var enumerator = Sleep(100);
            enumerator.MoveNext();
            Thread.Sleep(500);
            //while (e.MoveNext());
            ((IDisposable)enumerator).Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().EnumeratorTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

在这里,Resource.Dispose因为被调用((IDisposable)enumerator).Dispose().如果我们不调用enumerator.Dispose(),那么我们必须取消注释//while (e.MoveNext());并让迭代器优雅地完成,以便正确展开.

现在,我认为实现此功能的最佳方法async/await是使用自定义awaiter:

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303
    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        async Task SleepAsync(int milliseconds, Awaiter awaiter)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    await awaiter;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
            Console.WriteLine("Exit SleepAsync");
        }

        void AwaiterTest()
        {
            var awaiter = new Awaiter();
            var task = SleepAsync(100, awaiter);
            awaiter.MoveNext();
            Thread.Sleep(500);

            //while (awaiter.MoveNext()) ;
            awaiter.Dispose();
            task.Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().AwaiterTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }

        // custom awaiter
        public class Awaiter :
            System.Runtime.CompilerServices.INotifyCompletion,
            IDisposable
        {
            Action _continuation;
            readonly CancellationTokenSource _cts = new CancellationTokenSource();

            public Awaiter()
            {
                Console.WriteLine("Awaiter()");
            }

            ~Awaiter()
            {
                Console.WriteLine("~Awaiter()");
            }

            public void Cancel()
            {
                _cts.Cancel();
            }

            // let the client observe cancellation
            public CancellationToken Token { get { return _cts.Token; } }

            // resume after await, called upon external event
            public bool MoveNext()
            {
                if (_continuation == null)
                    return false;

                var continuation = _continuation;
                _continuation = null;
                continuation();
                return _continuation != null;
            }

            // custom Awaiter methods
            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get { return false; }
            }

            public void GetResult()
            {
                this.Token.ThrowIfCancellationRequested();
            }

            // INotifyCompletion
            public void OnCompleted(Action continuation)
            {
                _continuation = continuation;
            }

            // IDispose
            public void Dispose()
            {
                Console.WriteLine("Awaiter.Dispose()");
                if (_continuation != null)
                {
                    Cancel();
                    MoveNext();
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

什么时候放松,我请求取消内部Awaiter.Dispose并驱动状态机进入下一步(如果有待处理的继续).这导致观察内部的取消Awaiter.GetResult(由编译器生成的代码调用).这引发TaskCanceledException并进一步解除了using声明.所以,Resource得到妥善处理.最后,任务转换为已取消的状态(task.IsCancelled == true).

IMO,这是一种比在当前线程上安装自定义同步上下文更简单直接的方法.它可以很容易地适应多线程(这里有更多细节).

这应该给你比用IEnumerator/ 更多的自由yield.您可以try/catch在coroutine逻辑中使用,并且可以直接通过Task对象观察异常,取消和结果.

更新后,AFAIK对于迭代器的生成没有类比IDispose,当涉及到async状态机时.当你想取消/解除它时,你真的必须驱动状态机.如果你想占有一定疏忽使用的try/catch防止被取消,我认为你能做的最好的是检查是否_continuation为非空内Awaiter.Cancel(后MoveNext),并抛出一个致命异常乱的波段(使用辅助async void方法).


nos*_*tio 7

更新后,这已演变为一篇博文: Asynchronous coroutines with C# 8.0 and IAsyncEnumerable


现在是 2020 年,我关于和协程的其他答案对于await当今的 C# 语言标准来说已经过时了。C# 8.0 引入了对异步流的支持,具有以下新功能:

为了熟悉异步流的概念,我强烈建议您阅读Stephen Toub 所著的“在 C# 8 中使用异步枚举进行迭代”

总之,这些新功能为在 C# 中以更自然的方式实现异步协同例程提供了很好的基础。

维基百科很好地解释了协程(又名 corotine)通常是什么。我想在这里展示的是协同例程如何async通过使用await和任意交换生产者/消费者的角色来暂停它们的执行流程,使用C# 8.0。

下面的代码片段应该说明这个概念。在这里,我们有两个协同例程,CoroutineA并且CoroutineB作为其伪线性执行流程进入在其上执行协同和异步,因放出到彼此。

namespace Tests
{
    [TestClass]
    public class CoroutineProxyTest
    {
        const string TRACE_CATEGORY = "coroutines";

        /// <summary>
        /// CoroutineA yields to CoroutineB
        /// </summary>
        private async IAsyncEnumerable<string> CoroutineA(
            ICoroutineProxy<string> coroutineProxy,
            [EnumeratorCancellation] CancellationToken token)
        {
            await using (var coroutine = await coroutineProxy.AsAsyncEnumerator(token))
            {
                const string name = "A";
                var i = 0;

                // yielding 1
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 2
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 3
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";
            }
        }

        /// <summary>
        /// CoroutineB yields to CoroutineA
        /// </summary>
        private async IAsyncEnumerable<string> CoroutineB(
            ICoroutineProxy<string> coroutineProxy,
            [EnumeratorCancellation] CancellationToken token)
        {
            await using (var coroutine = await coroutineProxy.AsAsyncEnumerator(token))
            {
                const string name = "B";
                var i = 0;

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 1
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 2
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);
            }
        }

        /// <summary>
        /// Testing CoroutineA and CoroutineB cooperative execution
        /// </summary>
        [TestMethod] 
        public async Task Test_Coroutine_Execution_Flow()
        {
            // Here we execute two cotoutines, CoroutineA and CoroutineB,
            // which asynchronously yield to each other

            //TODO: test cancellation scenarios
            var token = CancellationToken.None;

            using (var apartment = new Tests.ThreadPoolApartment())
            {
                await apartment.Run(async () =>
                {
                    var proxyA = new CoroutineProxy<string>();
                    var proxyB = new CoroutineProxy<string>();

                    var listener = new Tests.CategoryTraceListener(TRACE_CATEGORY);
                    Trace.Listeners.Add(listener);
                    try
                    {
                        // start both coroutines
                        await Task.WhenAll(
                            proxyA.Run(token => CoroutineA(proxyB, token), token),
                            proxyB.Run(token => CoroutineB(proxyA, token), token))
                            .WithAggregatedExceptions();
                    }
                    finally
                    {
                        Trace.Listeners.Remove(listener);
                    }

                    var traces = listener.ToArray();
                    Assert.AreEqual(traces[0], "A about to yeild: 1");
                    Assert.AreEqual(traces[1], "B received: 1 from A");
                    Assert.AreEqual(traces[2], "B about to yeild: 1");
                    Assert.AreEqual(traces[3], "A received: 1 from B");
                    Assert.AreEqual(traces[4], "A about to yeild: 2");
                    Assert.AreEqual(traces[5], "B received: 2 from A");
                    Assert.AreEqual(traces[6], "B about to yeild: 2");
                    Assert.AreEqual(traces[7], "A received: 2 from B");
                    Assert.AreEqual(traces[8], "A about to yeild: 3");
                    Assert.AreEqual(traces[9], "B received: 3 from A");
                });
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

测试的输出如下所示:

协程:A 即将产生:1
协程:B 收到:1 来自 A
协程:B 即将产生:1
协程:A 收到:1 来自 B
协程:A 即将产生:2
协程:B 收到:2 来自 A
协程:B 即将产生:2
协程:A 收到:2 来自 B
协程:A 即将产生:3
协程:B 收到:来自 A 的 3 个

我目前在我的一些自动化 UI 测试场景中使用异步协程。例如,我可能有一个在 UI 线程上运行的异步测试工作流逻辑(即CouroutineA)和一个ThreadPool作为[TestMethod]方法的一部分在线程上运行的补充工作流(即CouroutineB)。

然后,我可以做这样的事情await WaitForUserInputAsync(); yield return true;,在某些关键点,同步CouroutineACouroutineB协作执行流程。

如果没有,yield return我将不得不使用某种形式的异步同步原语,例如 Stephen Toub 的AsyncManualResetEvent. 我个人觉得使用协程是进行这种同步的更自然的方式。

针对码CoroutineProxy(其驱动协同例程的执行)仍然是一个工作在进展。它目前使用 TPL DataflowBufferBlock作为代理队列来协调异步执行,我还不确定这是否是一种最佳方式。目前,这是它的样子:

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

#nullable enable

namespace Tests
{
    public interface ICoroutineProxy<T>
    {
        public Task<IAsyncEnumerable<T>> AsAsyncEnumerable(CancellationToken token = default);
    }

    public static class CoroutineProxyExt
    {
        public async static Task<IAsyncEnumerator<T>> AsAsyncEnumerator<T>(
            this ICoroutineProxy<T> @this,
            CancellationToken token = default)
        {
            return (await @this.AsAsyncEnumerable(token)).GetAsyncEnumerator(token);
        }
    }

    public class CoroutineProxy<T> : ICoroutineProxy<T>
    {
        readonly TaskCompletionSource<IAsyncEnumerable<T>> _proxyTcs =
            new TaskCompletionSource<IAsyncEnumerable<T>>(TaskCreationOptions.RunContinuationsAsynchronously);

        public CoroutineProxy()
        {
        }

        private async IAsyncEnumerable<T> CreateProxyAsyncEnumerable(
            ISourceBlock<T> bufferBlock,
            [EnumeratorCancellation] CancellationToken token)
        {
            var completionTask = bufferBlock.Completion;
            while (true)
            {
                var itemTask = bufferBlock.ReceiveAsync(token);
                var any = await Task.WhenAny(itemTask, completionTask);
                if (any == completionTask)
                {
                    // observe completion exceptions if any
                    await completionTask; 
                    yield break;
                }
                yield return await itemTask;
            }
        }

        async Task<IAsyncEnumerable<T>> ICoroutineProxy<T>.AsAsyncEnumerable(CancellationToken token)
        {
            using (token.Register(() => _proxyTcs.TrySetCanceled(), useSynchronizationContext: true))
            {
                return await _proxyTcs.Task;
            }
        }

        public async Task Run(Func<CancellationToken, IAsyncEnumerable<T>> routine, CancellationToken token)
        {
            token.ThrowIfCancellationRequested();

            var bufferBlock = new BufferBlock<T>();
            var proxy = CreateProxyAsyncEnumerable(bufferBlock, token);
            _proxyTcs.SetResult(proxy); // throw if already set

            try
            {
                //TODO: do we need to use routine(token).WithCancellation(token) ?
                await foreach (var item in routine(token))
                {
                    await bufferBlock.SendAsync(item, token);
                }
                bufferBlock.Complete();
            }
            catch (Exception ex)
            {
                ((IDataflowBlock)bufferBlock).Fault(ex);
                throw;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)