暂停/恢复异步任务的模式?

nos*_*tio 23 .net c# asynchronous async-await

我有一个主要是IO绑定的连续任务(后台拼写检查器与拼写检查服务器通信).有时,此任务需要暂停并稍后恢复,具体取决于用户活动.

虽然暂停/恢复基本上是什么async/await,但我发现很少有关于如何为异步方法实现实际暂停/播放逻辑的信息.有推荐的模式吗?

我也考虑过使用Stephen Toub AsyncManualResetEvent,但认为这可能是一种矫枉过正.

svi*_*ick 9

AsyncManualResetEvent考虑到当前代码的混乱程度,这正是您所需要的.但是稍微好一点的解决方案是使用Stephen Toub的另一种方法:PauseToken.它的工作方式类似AsyncManualResetEvent,除了它的接口专门用于此目的.


Ben*_*pka 9

当涉及到通过持有线程进行异步/等待编程时,所有其他答案似乎要么很复杂,要么没有抓住重点,因为线程占用 CPU 资源很昂贵,并且可能导致死锁。经过大量的试验、错误和许多死锁,这终于适用于我的高使用率测试。

var isWaiting = true;
while (isWaiting)
{
    try
    {
        //A long delay is key here to prevent the task system from holding the thread.
        //The cancellation token allows the work to resume with a notification 
        //from the CancellationTokenSource.
        await Task.Delay(10000, cancellationToken);
    }
    catch (TaskCanceledException)
    {
        //Catch the cancellation and it turns into continuation
        isWaiting = false;
    }
}
Run Code Online (Sandbox Code Playgroud)


nos*_*tio 5

更新于2019年,我最近有机会重新访问此代码,以下是作为控制台应用程序的完整示例(警告:PauseTokenSource需要良好的单元测试)。

注意,在我的情况下,要求是当使用者方代码(请求暂停)将继续时,生产者方代码应该已经达到暂停状态。因此,到UI准备好反映暂停状态时,所有后台活动都应该已经暂停了。

using System;
using System.Threading.Tasks;
using System.Threading;

namespace Console_19613444
{
    class Program
    {
        // PauseTokenSource
        public class PauseTokenSource
        {
            bool _paused = false;
            bool _pauseRequested = false;

            TaskCompletionSource<bool> _resumeRequestTcs;
            TaskCompletionSource<bool> _pauseConfirmationTcs;

            readonly SemaphoreSlim _stateAsyncLock = new SemaphoreSlim(1);
            readonly SemaphoreSlim _pauseRequestAsyncLock = new SemaphoreSlim(1);

            public PauseToken Token { get { return new PauseToken(this); } }

            public async Task<bool> IsPaused(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    return _paused;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task ResumeAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (!_paused)
                    {
                        return;
                    }

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        var resumeRequestTcs = _resumeRequestTcs;
                        _paused = false;
                        _pauseRequested = false;
                        _resumeRequestTcs = null;
                        _pauseConfirmationTcs = null;
                        resumeRequestTcs.TrySetResult(true);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task PauseAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (_paused)
                    {
                        return;
                    }

                    Task pauseConfirmationTask = null;

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        _pauseRequested = true;
                        _resumeRequestTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        _pauseConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        pauseConfirmationTask = WaitForPauseConfirmationAsync(token);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }

                    await pauseConfirmationTask;

                    _paused = true;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            private async Task WaitForResumeRequestAsync(CancellationToken token)
            {
                using (token.Register(() => _resumeRequestTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _resumeRequestTcs.Task;
                }
            }

            private async Task WaitForPauseConfirmationAsync(CancellationToken token)
            {
                using (token.Register(() => _pauseConfirmationTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _pauseConfirmationTcs.Task;
                }
            }

            internal async Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                Task resumeRequestTask = null;

                await _pauseRequestAsyncLock.WaitAsync(token);
                try
                {
                    if (!_pauseRequested)
                    {
                        return;
                    }
                    resumeRequestTask = WaitForResumeRequestAsync(token);
                    _pauseConfirmationTcs.TrySetResult(true);
                }
                finally
                {
                    _pauseRequestAsyncLock.Release();
                }

                await resumeRequestTask;
            }
        }

        // PauseToken - consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public Task<bool> IsPaused() { return _source.IsPaused(); }

            public Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                return _source.PauseIfRequestedAsync(token);
            }
        }

        // Basic usage

        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                while (true)
                {
                    token.ThrowIfCancellationRequested();

                    Console.WriteLine("Before await pause.PauseIfRequestedAsync()");
                    await pause.PauseIfRequestedAsync();
                    Console.WriteLine("After await pause.PauseIfRequestedAsync()");

                    await Task.Delay(1000);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        static async Task Test(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + await pts.IsPaused());

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                await pts.ResumeAsync();
                Console.WriteLine("After resume");
            }
        }

        static async Task Main()
        {
            await Test(CancellationToken.None);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


Meh*_*pçu 5

它对我有用

        using System;

        using System.Threading;
        using System.Threading.Tasks;

        namespace TaskTest2
        {

            class Program
            {
                static ManualResetEvent mre = new ManualResetEvent(false);
                static void Main(string[] args)
                {

                   mre.Set();
                   Task.Factory.StartNew(() =>
                    {
                        while (true)
                        {
                            Console.WriteLine("________________");
                            mre.WaitOne();
                        }

                    } );

                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");
                    Thread.Sleep(10000);
                    Console.WriteLine("Task Will Resume After 1 Second");
                    Thread.Sleep(1000);
                    mre.Set();

                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");


                    Console.Read();
                }
            }
        }
Run Code Online (Sandbox Code Playgroud)

  • 不要混合使用 WaitOne 和 async/await。这个解决方案与我第一次在项目中实现的解决方案非常相似,它最终在生产中持续了大约一年,但它在线程匮乏的环境中存在缺陷。WaitOne 占用线程,而不是在等待时释放它,可能会导致 CPU 使用率高和死锁。 (2认同)