异步/等待,自定义awaiter和垃圾收集器

nos*_*tio 25 .net c# garbage-collection task-parallel-library async-await

我正在处理一个托管对象在async方法中间过早完成的情况.

这是一个业余爱好家庭自动化项目(Windows 8.1,.NET 4.5.1),我向非托管第三方DLL提供C#回调.在某个传感器事件时调用回调.

为了处理这个事件,我使用async/await了一个简单的自定义awaiter(而不是TaskCompletionSource).我这样做的部分原因是为了减少不必要的分配数量,但主要是出于好奇心作为学习练习.

下面是我所拥有的非常剥离的版本,使用Win32计时器队列计时器来模拟非托管事件源.让我们从输出开始:

Press Enter to exit...
Awaiter()
tick: 0
tick: 1
~Awaiter()
tick: 2
tick: 3
tick: 4

请注意我的等待者在第二次打勾后如何最终确定.这是出乎意料的.

代码(控制台应用程序):

using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        static async Task TestAsync()
        {
            var awaiter = new Awaiter();
            //var hold = GCHandle.Alloc(awaiter);

            WaitOrTimerCallbackProc callback = (a, b) =>
                awaiter.Continue();

            IntPtr timerHandle;
            if (!CreateTimerQueueTimer(out timerHandle, 
                    IntPtr.Zero, 
                    callback, 
                    IntPtr.Zero, 500, 500, 0))
                throw new System.ComponentModel.Win32Exception(
                    Marshal.GetLastWin32Error());

            var i = 0;
            while (true)
            {
                await awaiter;
                Console.WriteLine("tick: " + i++);
            }
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Press Enter to exit...");
            var task = TestAsync();
            Thread.Sleep(1000);
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced);
            Console.ReadLine();
        }

        // custom awaiter
        public class Awaiter : 
            System.Runtime.CompilerServices.INotifyCompletion
        {
            Action _continuation;

            public Awaiter()
            {
                Console.WriteLine("Awaiter()");
            }

            ~Awaiter()
            {
                Console.WriteLine("~Awaiter()");
            }

            // resume after await, called upon external event
            public void Continue()
            {
                var continuation = Interlocked.Exchange(ref _continuation, null);
                if (continuation != null)
                    continuation();
            }

            // custom Awaiter methods
            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get { return false; }
            }

            public void GetResult()
            {
            }

            // INotifyCompletion
            public void OnCompleted(Action continuation)
            {
                Volatile.Write(ref _continuation, continuation);
            }
        }

        // p/invoke
        delegate void WaitOrTimerCallbackProc(IntPtr lpParameter, bool TimerOrWaitFired);

        [DllImport("kernel32.dll")]
        static extern bool CreateTimerQueueTimer(out IntPtr phNewTimer,
           IntPtr TimerQueue, WaitOrTimerCallbackProc Callback, IntPtr Parameter,
           uint DueTime, uint Period, uint Flags);
    }
}
Run Code Online (Sandbox Code Playgroud)

我设法awaiter用这一行压制了这个集合:

var hold = GCHandle.Alloc(awaiter);
Run Code Online (Sandbox Code Playgroud)

但是,我不完全理解为什么我必须创建这样的强引用.将awaiter是一个无限循环中被引用.AFAICT,在返回的任务TestAsync完成(取消/故障)之前,它不会超出范围.任务本身Main永远在内部引用.

最终,我减少TestAsync到这个:

static async Task TestAsync()
{
    var awaiter = new Awaiter();
    //var hold = GCHandle.Alloc(awaiter);

    var i = 0;
    while (true)
    {
        await awaiter;
        Console.WriteLine("tick: " + i++);
    }
}
Run Code Online (Sandbox Code Playgroud)

收藏仍然发生.我怀疑整个编译器生成的状态机对象正在收集.有人可以解释为什么会这样吗?

现在,通过以下小修改,awaiter不再进行垃圾回收:

static async Task TestAsync()
{
    var awaiter = new Awaiter();
    //var hold = GCHandle.Alloc(awaiter);

    var i = 0;
    while (true)
    {
        //await awaiter;
        await Task.Delay(500);
        Console.WriteLine("tick: " + i++);
    }
}
Run Code Online (Sandbox Code Playgroud)

更新后,这个小提示显示了如何在awaiter没有任何p/invoke代码的情况下对对象进行垃圾收集.我认为,原因可能是在生成的状态机对象的初始状态之外没有外部引用.我需要研究编译器生成的代码.awaiter


更新了,这是编译器生成的代码(对于这个小提琴,VS2012).显然,Task返回者stateMachine.t__builder.Task不会保留对状态机本身(stateMachine)的引用(或者更确切地说是副本).我错过了什么吗?

    private static Task TestAsync()
    {
      Program.TestAsyncd__0 stateMachine;
      stateMachine.t__builder = AsyncTaskMethodBuilder.Create();
      stateMachine.1__state = -1;
      stateMachine.t__builder.Start<Program.TestAsyncd__0>(ref stateMachine);
      return stateMachine.t__builder.Task;
    }

    [CompilerGenerated]
    [StructLayout(LayoutKind.Auto)]
    private struct TestAsyncd__0 : IAsyncStateMachine
    {
      public int 1__state;
      public AsyncTaskMethodBuilder t__builder;
      public Program.Awaiter awaiter5__1;
      public int i5__2;
      private object u__awaiter3;
      private object t__stack;

      void IAsyncStateMachine.MoveNext()
      {
        try
        {
          bool flag = true;
          Program.Awaiter awaiter;
          switch (this.1__state)
          {
            case -3:
              goto label_7;
            case 0:
              awaiter = (Program.Awaiter) this.u__awaiter3;
              this.u__awaiter3 = (object) null;
              this.1__state = -1;
              break;
            default:
              this.awaiter5__1 = new Program.Awaiter();
              this.i5__2 = 0;
              goto label_5;
          }
label_4:
          awaiter.GetResult();
          Console.WriteLine("tick: " + (object) this.i5__2++);
label_5:
          awaiter = this.awaiter5__1.GetAwaiter();
          if (!awaiter.IsCompleted)
          {
            this.1__state = 0;
            this.u__awaiter3 = (object) awaiter;
            this.t__builder.AwaitOnCompleted<Program.Awaiter, Program.TestAsyncd__0>(ref awaiter, ref this);
            flag = false;
            return;
          }
          else
            goto label_4;
        }
        catch (Exception ex)
        {
          this.1__state = -2;
          this.t__builder.SetException(ex);
          return;
        }
label_7:
        this.1__state = -2;
        this.t__builder.SetResult();
      }

      [DebuggerHidden]
      void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine param0)
      {
        this.t__builder.SetStateMachine(param0);
      }
    }
Run Code Online (Sandbox Code Playgroud)

nos*_*tio 15

我删除了所有p/invoke的东西,并重新创建了编译器生成的状态机逻辑的简化版本.它表现出相同的行为:awaiter在第一次调用状态机的MoveNext方法之后收集garabage .

Microsoft最近在为其.NET参考源提供Web UI方面做得非常出色,这非常有用.在研究了实施之后AsyncTaskMethodBuilder,最重要的是AsyncMethodBuilderCore.GetCompletionAction,我现在相信我所看到的GC行为非常有意义.我将尝试在下面解释.

代码:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.Runtime.CompilerServices;

namespace ConsoleApplication
{
    public class Program
    {
        // Original version with async/await

        /*
        static async Task TestAsync()
        {
            Console.WriteLine("Enter TestAsync");
            var awaiter = new Awaiter();
            //var hold = GCHandle.Alloc(awaiter);

            var i = 0;
            while (true)
            {
                await awaiter;
                Console.WriteLine("tick: " + i++);
            }
            Console.WriteLine("Exit TestAsync");
        }
        */

        // Manually coded state machine version

        struct StateMachine: IAsyncStateMachine
        {
            public int _state;
            public Awaiter _awaiter;
            public AsyncTaskMethodBuilder _builder;

            public void MoveNext()
            {
                Console.WriteLine("StateMachine.MoveNext, state: " + this._state);
                switch (this._state)
                {
                    case -1:
                        {
                            this._awaiter = new Awaiter();
                            goto case 0;
                        };
                    case 0:
                        {
                            this._state = 0;
                            var awaiter = this._awaiter;
                            this._builder.AwaitOnCompleted(ref awaiter, ref this);
                            return;
                        };

                    default:
                        throw new InvalidOperationException();
                }
            }

            public void SetStateMachine(IAsyncStateMachine stateMachine)
            {
                Console.WriteLine("StateMachine.SetStateMachine, state: " + this._state);
                this._builder.SetStateMachine(stateMachine);
                // s_strongRef = stateMachine;
            }

            static object s_strongRef = null;
        }

        static Task TestAsync()
        {
            StateMachine stateMachine = new StateMachine();
            stateMachine._state = -1;

            stateMachine._builder = AsyncTaskMethodBuilder.Create();
            stateMachine._builder.Start(ref stateMachine);

            return stateMachine._builder.Task;
        }

        public static void Main(string[] args)
        {
            var task = TestAsync();
            Thread.Sleep(1000);
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced);
            Console.WriteLine("Press Enter to exit...");
            Console.ReadLine();
        }

        // custom awaiter
        public class Awaiter :
            System.Runtime.CompilerServices.INotifyCompletion
        {
            Action _continuation;

            public Awaiter()
            {
                Console.WriteLine("Awaiter()");
            }

            ~Awaiter()
            {
                Console.WriteLine("~Awaiter()");
            }

            // resume after await, called upon external event
            public void Continue()
            {
                var continuation = Interlocked.Exchange(ref _continuation, null);
                if (continuation != null)
                    continuation();
            }

            // custom Awaiter methods
            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get { return false; }
            }

            public void GetResult()
            {
            }

            // INotifyCompletion
            public void OnCompleted(Action continuation)
            {
                Console.WriteLine("Awaiter.OnCompleted");
                Volatile.Write(ref _continuation, continuation);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

编译器生成的状态机是一个可变结构,由它传递ref.显然,这是一个优化,以避免额外的分配.

其核心部分发生在内部AsyncMethodBuilderCore.GetCompletionAction,当前状态机结构被装箱,并且盒装副本的引用由传递给的连续回调保存INotifyCompletion.OnCompleted.

这是对状态机的唯一参考,它有机会站在GC后继续存活await.在Task通过返回的对象TestAsync没有持有对它的引用,只有await继续回调做.我相信这是故意的,以保持高效的GC行为.

注意注释行:

// s_strongRef = stateMachine;
Run Code Online (Sandbox Code Playgroud)

如果我取消它的注释,状态机的盒装副本就不会得到GC,并且awaiter作为它的一部分保持活着.当然,这不是解决方案,但它说明了问题.

所以,我得出以下结论.虽然异步操作处于" MoveNext正在进行中"并且状态机的状态()当前都没有被执行,但是继续回调的"守护者"的责任是强制保留回调本身,以确保状态机的盒装副本不会被垃圾收集.

例如,在YieldAwaitable(返回Task.Yield)的情况下ThreadPool,作为ThreadPool.QueueUserWorkItem调用的结果,任务调度程序保留对延续回调的外部引用.如果是Task.GetAwaiter,它由任务对象间接引用.

就我而言,延续回调的"守护者"就是它Awaiter本身.

因此,只要CLR不知道(在状态机对象之外)继续回调的外部引用,自定义等待者应该采取措施使回调对象保持活动状态.反过来,这将使整个状态机保持活力.在这种情况下,以下步骤是必要的:

  1. 打电话给GCHandle.Alloc回拨INotifyCompletion.OnCompleted.
  2. 打电话GCHandle.Free时,异步事件已经实际发生,调用延续回调之前.
  3. 如果事件从未发生,则实现IDispose调用GCHandle.Free.

鉴于此,下面是原始计时器回调代码的一个版本,它可以正常工作.注意,没有必要强制保持定时器回调委托(WaitOrTimerCallbackProc callback).它作为状态机的一部分保持活着. 更新:正如@svick所指出的,此语句可能特定于状态机的当前实现(C#5.0).我已添加GC.KeepAlive(callback)以消除对此行为的任何依赖,以防它在将来的编译器版本中发生更改.

using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        // Test task
        static async Task TestAsync(CancellationToken token)
        {
            using (var awaiter = new Awaiter())
            {
                WaitOrTimerCallbackProc callback = (a, b) =>
                    awaiter.Continue();
                try
                {
                    IntPtr timerHandle;
                    if (!CreateTimerQueueTimer(out timerHandle,
                            IntPtr.Zero,
                            callback,
                            IntPtr.Zero, 500, 500, 0))
                        throw new System.ComponentModel.Win32Exception(
                            Marshal.GetLastWin32Error());
                    try
                    {
                        var i = 0;
                        while (true)
                        {
                            token.ThrowIfCancellationRequested();
                            await awaiter;
                            Console.WriteLine("tick: " + i++);
                        }
                    }
                    finally
                    {
                        DeleteTimerQueueTimer(IntPtr.Zero, timerHandle, IntPtr.Zero);
                    }
                }
                finally
                {
                    // reference the callback at the end
                    // to avoid a chance for it to be GC'ed
                    GC.KeepAlive(callback);
                }
            }
        }

        // Entry point
        static void Main(string[] args)
        {
            // cancel in 3s
            var testTask = TestAsync(new CancellationTokenSource(10 * 1000).Token);

            Thread.Sleep(1000);
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);

            Thread.Sleep(2000);
            Console.WriteLine("Press Enter to GC...");
            Console.ReadLine();

            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced);
            Console.WriteLine("Press Enter to exit...");
            Console.ReadLine();
        }

        // Custom awaiter
        public class Awaiter :
            System.Runtime.CompilerServices.INotifyCompletion,
            IDisposable
        {
            Action _continuation;
            GCHandle _hold = new GCHandle();

            public Awaiter()
            {
                Console.WriteLine("Awaiter()");
            }

            ~Awaiter()
            {
                Console.WriteLine("~Awaiter()");
            }

            void ReleaseHold()
            {
                if (_hold.IsAllocated)
                    _hold.Free();
            }

            // resume after await, called upon external event
            public void Continue()
            {
                Action continuation;

                // it's OK to use lock (this)
                // the C# compiler would never do this,
                // because it's slated to work with struct awaiters
                lock (this)
                {
                    continuation = _continuation;
                    _continuation = null;
                    ReleaseHold();
                }

                if (continuation != null)
                    continuation();
            }

            // custom Awaiter methods
            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get { return false; }
            }

            public void GetResult()
            {
            }

            // INotifyCompletion
            public void OnCompleted(Action continuation)
            {
                lock (this)
                {
                    ReleaseHold();
                    _continuation = continuation;
                    _hold = GCHandle.Alloc(_continuation);
                }
            }

            // IDispose
            public void Dispose()
            {
                lock (this)
                {
                    _continuation = null;
                    ReleaseHold();
                }
            }
        }

        // p/invoke
        delegate void WaitOrTimerCallbackProc(IntPtr lpParameter, bool TimerOrWaitFired);

        [DllImport("kernel32.dll")]
        static extern bool CreateTimerQueueTimer(out IntPtr phNewTimer,
            IntPtr TimerQueue, WaitOrTimerCallbackProc Callback, IntPtr Parameter,
            uint DueTime, uint Period, uint Flags);

        [DllImport("kernel32.dll")]
        static extern bool DeleteTimerQueueTimer(IntPtr TimerQueue, IntPtr Timer,
            IntPtr CompletionEvent);
    }
}
Run Code Online (Sandbox Code Playgroud)

  • @Noseratio不保证是GCed.你是对的,它不适用于当前的C#编译器,因为本地将被转换为状态机类型的字段.但编译器不必这样做. (2认同)
  • @Noseratio第二次阅读这个问题和答案,绝对令人惊叹.一个用于'async-await`万神殿! (2认同)