TaskContinuationOptions.RunContinuations异步和Stack Dives

Yuv*_*kov 21 .net c# task-parallel-library async-await .net-4.6

这篇博文中,Stephan Toub描述了一个将包含在.NET 4.6中的新功能,它为被调用的TaskCreationOptions和TaskContinuationOptions枚举增加了另一个值RunContinuationsAsynchronously.

他解释说:

"我谈到了在TaskCompletionSource上调用{Try} Set*方法的分支,TaskCompletionSource的Task的任何同步延续都可以作为调用的一部分同步运行.如果我们在这里调用SetResult同时持有锁,那么同步延续关闭那个任务将在持有锁的同时运行,这可能会导致非常真实的问题.所以,在持有锁时我们抓住TaskCompletionSource来完成,但我们还没有完成它,延迟这样做直到锁定已被释放"

并给出以下示例来演示:

private SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
private async Task WorkAsync()
{
    await _gate.WaitAsync().ConfigureAwait(false);
    try
    {
        // work here
    }
    finally { _gate.Release(); }
}
Run Code Online (Sandbox Code Playgroud)

现在假设你有很多对WorkAsync的调用:

await Task.WhenAll(from i in Enumerable.Range(0, 10000) select WorkAsync());
Run Code Online (Sandbox Code Playgroud)

我们刚刚创建了10,000个对WorkAsync的调用,这些调用将在信号量上进行适当的序列化.其中一个任务将进入关键区域,其他任务将在WaitAsync调用上排队,SemaphoreSlim内部有效地将任务调用完成,当有人调用Release时.如果Release同步完成了Task,那么当第一个任务调用Release时,它将同步开始执行第二个任务,当它调用Release时,它将同步开始执行第三个任务,依此类推.如果上面代码的"// work here"部分没有包含任何等待产生的东西,那么我们可能会在这里堆叠潜水并最终可能导致堆栈爆炸.

我很难掌握他谈论同步执行延续的部分.

怎么可能导致堆栈潜水?更重要的是RunContinuationsAsynchronously,为了解决这个问题,有效的做法是什么?

i3a*_*non 13

这里的关键概念是任务的延续可以在完成先前任务的同一线程上同步运行.

让我们假设这是SemaphoreSlim.Release实现(实际上是Toub的AsyncSemphore):

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        toRelease.SetResult(true); 
}
Run Code Online (Sandbox Code Playgroud)

我们可以看到它同步完成一项任务(使用TaskCompletionSource).在这种情况下,如果WorkAsync没有其他异步点(即根本没有awaits,或者所有awaits都在已完成的任务上)并且调用_gate.Release()可能_gate.WaitAsync()在同一线程上同步完成挂起调用,则可能会达到单个线程的状态顺序释放信号量,完成下一个待处理的呼叫,执行// work here然后再次释放信号等.等.

这意味着同一个线程在堆栈中越来越深,因此堆栈潜水.

RunContinuationsAsynchronously确保延续不同步运行,因此释放信号量的线程继续运行并且为另一个线程调度延续(哪一个取决于其他延续参数,例如TaskScheduler)

这在逻辑上类似于将完成发布到ThreadPool:

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        Task.Run(() => toRelease.SetResult(true)); 
}
Run Code Online (Sandbox Code Playgroud)

  • 我记得TPL很简单。这似乎是一个默认的调度程序应解决的问题,而无需开发人员以打结的方式来了解另一种情况。 (3认同)
  • @YuvalItzchakov它会在某个时刻停止,所以它不是一个无限循环,但是它仍然可能会到达一个“ StackOverflowException”,而在一个“锁”内部则会导致死锁。 (2认同)
  • @Gusdor TPL可以。实际上,`SemaphoreSlim`并不以这种方式起作用,它会将所有延续都发布到ThreadPool中。使用此选项,您可以自己构建TPL等效构造。您不需要在日常代码中使用它。 (2认同)
  • @YuvalItzchakov在某些时候,每个开发人员现在都需要知道a)用例何时特殊以及b)如何围绕它。 (2认同)
  • @YuvalItzchakov所以整个问题仅限于信号量实现吗?我不敢相信 这种异步的东西已经成为抽象的漏洞。 (2认同)

nos*_*tio 7

这怎么可能导致堆栈跳水?更重要的是,为了解决这个问题,RunContinuationsAsynchronously 将有效地做什么?

i3arnon提供的介绍背后的原因很好的解释RunContinuationsAsynchronously。我的回答与他的相当;事实上,我写这篇文章也是为了我自己的参考(我自己在半年内不会记得这件事的任何微妙之处:)

首先,让我们来看看怎样TaskCompletionSourceRunContinuationsAsynchronously选择是从不同Task.Run(() => tcs.SetResult(result))或喜欢。让我们尝试一个简单的控制台应用程序:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplications
{
    class Program
    {
        static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(100, 100);

            Console.WriteLine("start, " + new { System.Environment.CurrentManagedThreadId });

            var tcs = new TaskCompletionSource<bool>();

            // test ContinueWith-style continuations (TaskContinuationOptions.ExecuteSynchronously)
            ContinueWith(1, tcs.Task);
            ContinueWith(2, tcs.Task);
            ContinueWith(3, tcs.Task);

            // test await-style continuations
            ContinueAsync(4, tcs.Task);
            ContinueAsync(5, tcs.Task);
            ContinueAsync(6, tcs.Task);

            Task.Run(() =>
            {
                Console.WriteLine("before SetResult, " + new { System.Environment.CurrentManagedThreadId });
                tcs.TrySetResult(true);
                Thread.Sleep(10000);
            });
            Console.ReadLine();
        }

        // log
        static void Continuation(int id)
        {
            Console.WriteLine(new { continuation = id, System.Environment.CurrentManagedThreadId });
            Thread.Sleep(1000);
        }

        // await-style continuation
        static async Task ContinueAsync(int id, Task task)
        {
            await task.ConfigureAwait(false);
            Continuation(id);
        }

        // ContinueWith-style continuation
        static Task ContinueWith(int id, Task task)
        {
            return task.ContinueWith(
                t => Continuation(id),
                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

请注意所有延续如何在TrySetResult被调用的同一线程上同步运行:

开始,{ CurrentManagedThreadId = 1 }
在 SetResult 之前,{ CurrentManagedThreadId = 3 }
{ 继续 = 1,CurrentManagedThreadId = 3 }
{ 继续 = 2,CurrentManagedThreadId = 3 }
{ 继续 = 3,CurrentManagedThreadId = 3 }
{ 继续 = 4,CurrentManagedThreadId = 3 }
{ 继续 = 5,CurrentManagedThreadId = 3 }
{ 继续 = 6,CurrentManagedThreadId = 3 }

现在,如果我们不希望这种情况发生,并且我们希望每个 continuation 异步运行(即,在没有任何同步上下文的情况下,与其他 continuation 并行并可能在另一个线程上运行)怎么办?

await通过安装一个假的临时同步上下文(更多细节在这里),有一个技巧可以为-style continuations做到这一点:

public static class TaskExt
{
    class SimpleSynchronizationContext : SynchronizationContext
    {
        internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
    };

    public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
    {
        if (!asyncAwaitContinuations)
        {
            @this.TrySetResult(result);
            return;
        }

        var sc = SynchronizationContext.Current;
        SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
        try
        {
            @this.TrySetResult(result);
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(sc);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,tcs.TrySetResult(true, asyncAwaitContinuations: true)在我们的测试代码中使用:

开始,{ CurrentManagedThreadId = 1 }
在 SetResult 之前,{ CurrentManagedThreadId = 3 }
{ 继续 = 1,CurrentManagedThreadId = 3 }
{ 继续 = 2,CurrentManagedThreadId = 3 }
{ 继续 = 3,CurrentManagedThreadId = 3 }
{ 继续 = 4,CurrentManagedThreadId = 4 }
{ 继续 = 5,CurrentManagedThreadId = 5 }
{ 继续 = 6,CurrentManagedThreadId = 6 }

请注意await延续现在如何并行运行(尽管在所有同步ContinueWith延续之后仍然如此)。

这个asyncAwaitContinuations: true逻辑是一个hack,它只适用于awaitcontinuations。新功能RunContinuationsAsynchronously使其适用于任何类型的延续,附加到TaskCompletionSource.Task.

另一个不错的方面RunContinuationsAsynchronously是,任何await计划在特定同步上下文上恢复的样式延续都将在该上下文上异步运行(使用SynchronizationContext.Post,即使TCS.Task同一上下文上完成(与 的当前行为不同TCS.SetResult)。ContinueWith- 样式延续也将运行由其相应的任务调度程序(最常见的是,TaskScheduler.DefaultTaskScheduler.FromCurrentSynchronizationContext)异步执行。它们不会通过 内联TaskScheduler.TryExecuteTaskInline。我相信 Stephen Toub 在他的博客文章的评论中已经阐明了这一点,并且也可以在 CoreCLR 的 Task.cs 中看到

为什么我们要担心在所有延续上强加异步?

当我处理协同async执行的方法(协同例程)时,我通常需要它。

一个简单的例子是可暂停的异步处理:一个异步进程暂停/恢复另一个的执行。它们的执行工作流在某些awaitTaskCompletionSource同步,并直接或间接用于此类同步。

下面是一些现成的示例代码,它使用了 Stephen Toub 的PauseTokenSource. 在这里,一种async方法StartAndControlWorkAsync启动并定期暂停/恢复另一种async方法,DoWorkAsync。尝试更改asyncAwaitContinuations: trueasyncAwaitContinuations: false并查看逻辑完全中断:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main()
        {
            StartAndControlWorkAsync(CancellationToken.None).Wait();
        }

        // Do some work which can be paused/resumed
        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                var step = 0;
                while (true)
                {
                    token.ThrowIfCancellationRequested();
                    Console.WriteLine("Working, step: " + step++);
                    await Task.Delay(1000).ConfigureAwait(false);
                    Console.WriteLine("Before await pause.WaitForResumeAsync()");
                    await pause.WaitForResumeAsync();
                    Console.WriteLine("After await pause.WaitForResumeAsync()");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        // Start DoWorkAsync and pause/resume it
        static async Task StartAndControlWorkAsync(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + pts.IsPaused);

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                pts.Resume();
                Console.WriteLine("After resume");
            }
        }

        // Based on Stephen Toub's PauseTokenSource
        // http://blogs.msdn.com/b/pfxteam/archive/2013/01/13/cooperatively-pausing-async-methods.aspx
        // the main difference is to make sure that when the consumer-side code - which requested the pause - continues, 
        // the producer-side code has already reached the paused (awaiting) state.
        // E.g. a media player "Pause" button is clicked, gets disabled, playback stops, 
        // and only then "Resume" button gets enabled

        public class PauseTokenSource
        {
            internal static readonly Task s_completedTask = Task.Delay(0);

            readonly object _lock = new Object();

            bool _paused = false;

            TaskCompletionSource<bool> _pauseResponseTcs;
            TaskCompletionSource<bool> _resumeRequestTcs;

            public PauseToken Token { get { return new PauseToken(this); } }

            public bool IsPaused
            {
                get
                {
                    lock (_lock)
                        return _paused;
                }
            }

            // request a resume
            public void Resume()
            {
                TaskCompletionSource<bool> resumeRequestTcs = null;

                lock (_lock)
                {
                    resumeRequestTcs = _resumeRequestTcs;
                    _resumeRequestTcs = null;

                    if (!_paused)
                        return;
                    _paused = false;
                }

                if (resumeRequestTcs != null)
                    resumeRequestTcs.TrySetResult(true, asyncAwaitContinuations: true);
            }

            // request a pause (completes when paused state confirmed)
            public Task PauseAsync()
            {
                Task responseTask = null;

                lock (_lock)
                {
                    if (_paused)
                        return _pauseResponseTcs.Task;
                    _paused = true;

                    _pauseResponseTcs = new TaskCompletionSource<bool>();
                    responseTask = _pauseResponseTcs.Task;

                    _resumeRequestTcs = null;
                }

                return responseTask;
            }

            // wait for resume request
            internal Task WaitForResumeAsync()
            {
                Task resumeTask = s_completedTask;
                TaskCompletionSource<bool> pauseResponseTcs = null;

                lock (_lock)
                {
                    if (!_paused)
                        return s_completedTask;

                    _resumeRequestTcs = new TaskCompletionSource<bool>();
                    resumeTask = _resumeRequestTcs.Task;

                    pauseResponseTcs = _pauseResponseTcs;

                    _pauseResponseTcs = null;
                }

                if (pauseResponseTcs != null)
                    pauseResponseTcs.TrySetResult(true, asyncAwaitContinuations: true);

                return resumeTask;
            }
        }

        // consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public bool IsPaused { get { return _source != null && _source.IsPaused; } }

            public Task WaitForResumeAsync()
            {
                return IsPaused ?
                    _source.WaitForResumeAsync() :
                    PauseTokenSource.s_completedTask;
            }
        }


    }

    public static class TaskExt
    {
        class SimpleSynchronizationContext : SynchronizationContext
        {
            internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
        };

        public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
        {
            if (!asyncAwaitContinuations)
            {
                @this.TrySetResult(result);
                return;
            }

            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
            try
            {
                @this.TrySetResult(result);
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我不想在Task.Run(() => tcs.SetResult(result))这里使用,因为ThreadPool当它们已经被安排在具有适当同步上下文的 UI 线程上异步运行时,将延续推入将是多余的。同时,如果双方StartAndControlWorkAsyncDoWorkAsync在相同的用户界面同步上下文中运行,我们就也有一个堆栈潜水(如果tcs.SetResult(result)不使用Task.RunSynchronizationContext.Post包装)。

现在,RunContinuationsAsynchronously可能是这个问题的最佳解决方案。

  • 灿烂!明天我将尝试执行一些代码,看看我是否有任何问题。谢谢! (2认同)