协作暂停异步方法

use*_*019 2 c# asynchronous task-parallel-library async-await pause

我正在使用一个任务来进行长时间运行的异步处理操作,我希望能够在任意时刻暂停和恢复。对我来说幸运的是,Microsoft 的 TPL 自己的作者之一已经提出了这个问题的解决方案。唯一的麻烦是他的解决方案不能正常工作。

当您删除await Task.Delay(100)下面代码中的 时,代码将在第一个请求之后停止接受暂停请求。如果 的值可信的话,代码似乎SomeMethodAsync会在与其他任务相同的线程上恢复执行。Thread.CurrentThread.ManagedThreadId的输出还SomeMethodAsync表明它正在多个线程上运行。

我一直发现 TPL 相当混乱且难以使用,而 async/await 更是如此,所以我很难理解这里到底发生了什么。如果有人能解释的话,我将非常感激。

极简示例代码:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace PauseTokenTest {
    class Program {
        static void Main() {
            var pts = new PauseTokenSource();
            Task.Run(() =>
            {
                while (true) {
                    Console.ReadLine();
                    Console.WriteLine(
                        $"{Thread.CurrentThread.ManagedThreadId}: Pausing task");
                    pts.IsPaused = !pts.IsPaused;
                }
            });
            SomeMethodAsync(pts.Token).Wait();
        }

        public static async Task SomeMethodAsync(PauseToken pause) {
            for (int i = 0; ; i++) {
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: {i}");

                // Comment this out and repeatedly pausing and resuming will no longer work.
                await Task.Delay(100);

                await pause.WaitWhilePausedAsync();
            }
        }
    }

    public class PauseTokenSource {
        internal static readonly Task s_completedTask =
            Task.FromResult(true);
        volatile TaskCompletionSource<bool> m_paused;

        public bool IsPaused {
            get { return m_paused != null; }
            set {
                if (value) {
                    Interlocked.CompareExchange(
                        ref m_paused, new TaskCompletionSource<bool>(), null);
                } else {
                    while (true) {
                        var tcs = m_paused;
                        if (tcs == null) return;
                        if (Interlocked.CompareExchange(ref m_paused, null, tcs) == tcs) {
                            tcs.SetResult(true);
                            break;
                        }
                    }
                }
            }
        }

        public PauseToken Token { get { return new PauseToken(this); } }

        internal Task WaitWhilePausedAsync() {
            var cur = m_paused;
            return cur != null ? cur.Task : s_completedTask;
        }
    }

    public struct PauseToken {
        readonly PauseTokenSource m_source;
        internal PauseToken(PauseTokenSource source) {
            m_source = source;
        }

        public bool IsPaused {
            get { return m_source != null && m_source.IsPaused; }
        }

        public Task WaitWhilePausedAsync() {
            return IsPaused ? m_source.WaitWhilePausedAsync() :
                PauseTokenSource.s_completedTask;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Ste*_*ary 6

发生这种情况是因为任务延续的执行方式。在框架的许多地方,任务延续都会尽可能同步执行。这并不总是可能的,但它确实变得常见的一个地方是线程池线程,这些线程通常被视为可交换的。

await是使用同步延续的地方之一;据我所知,这没有在任何地方正式记录,但我在我的博客上描述了这种行为。

因此,本质上发生的事情是:SomeMethodAsync暂停(从- 没有上下文await返回的任务),它将其余部分附加为该任务的延续。然后,当(线程池)线程切换时,它完成从 返回的任务。然后,运行时查看与该延续关联的上下文(无),以便该延续被视为与任何线程池线程兼容。嘿,当前线程是线程池线程!所以它只是直接运行延续。WaitWhilePausedAsyncSomeMethodAsyncTask.RunIsPausedWaitWhilePausedAsync

有趣的是,如果没有另一个 await,该SomeMethodAsync方法将变得完全同步:它总是从 中检索已经完成的任务WaitWhilePausedAsync,因此它只是永远继续执行其无限循环。提醒一下,await其行为与已完成的任务同步(正如我在博客中所述)。这个无限循环在setter内运行IsPaused,因此Task.Run代码永远不会继续循环以ReadLine再次调用。

如果SomeMethodAsync 确实有一个await,那么它可以异步运行,返回到其调用者(完成延续),并允许循环Task.Run继续执行。

正如 Theodor Zoulias 所建议的,将TaskCreationOptions.RunContinuationsAsynchronously标志传递给遗嘱TaskCompletionSource<bool>也是可行的。在这种情况下,任务延续是异步运行的(在单独的线程池线程上),而不是直接在调用线程上执行。

IIRC,您引用的博客文章早于该RunContinuationsAsynchronously标志。它也早于(非通用)TaskCompletionSource.

另外,正如 Theodor Zoulias 所建议的,我的Nito.AsyncExPauseTokenSource库中有一个可以避免此问题的库。它还使用.RunContinuationsAsynchronously

  • 另外:这个问题的时机很有趣!碰巧我下周将[发表演讲](https://that.us/activities/9h2sITZbUA8BZwaXEoBw/?utm_content=249202691&amp;hss_channel=tw-184857087&amp;utm_medium=social&amp;utm_source=twitter&amp;utm_campaign=Wisconsin%20%2723),其中包括实施暂停令牌以及其他高级异步主题。 (2认同)