用于自动取消和重新启动任务的模式

nos*_*tio 16 .net c# asynchronous task-parallel-library async-await

是否有建议的自我取消和重启任务模式?

例如,我正在研究背景拼写检查程序的API.拼写检查会话被包装为Task.每个新会话都应该取消之前的会话并等待其终止(以正确地重新使用拼写检查服务提供商等资源).

我想出了这样的事情:

class Spellchecker
{
    Task pendingTask = null; // pending session
    CancellationTokenSource cts = null; // CTS for pending session

    // SpellcheckAsync is called by the client app
    public async Task<bool> SpellcheckAsync(CancellationToken token)
    {
        // SpellcheckAsync can be re-entered
        var previousCts = this.cts;
        var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
        this.cts = newCts;

        if (IsPendingSession())
        {
            // cancel the previous session and wait for its termination
            if (!previousCts.IsCancellationRequested)
                previousCts.Cancel();
            // this is not expected to throw
            // as the task is wrapped with ContinueWith
            await this.pendingTask; 
        }

        newCts.Token.ThrowIfCancellationRequested();
        var newTask = SpellcheckAsyncHelper(newCts.Token);

        this.pendingTask = newTask.ContinueWith((t) => {
            this.pendingTask = null;
            // we don't need to know the result here, just log the status
            Debug.Print(((object)t.Exception ?? (object)t.Status).ToString());
        }, TaskContinuationOptions.ExecuteSynchronously);

        return await newTask;
    }

    // the actual task logic
    async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
    {
        // do not start a new session if the the previous one still pending
        if (IsPendingSession())
            throw new ApplicationException("Cancel the previous session first.");

        // do the work (pretty much IO-bound)
        try
        {
            bool doMore = true;
            while (doMore)
            {
                token.ThrowIfCancellationRequested();
                await Task.Delay(500); // placeholder to call the provider
            }
            return doMore;
        }
        finally
        {
            // clean-up the resources
        }
    }

    public bool IsPendingSession()
    {
        return this.pendingTask != null &&
            !this.pendingTask.IsCompleted &&
            !this.pendingTask.IsCanceled &&
            !this.pendingTask.IsFaulted;
    }
}
Run Code Online (Sandbox Code Playgroud)

客户端应用程序(UI)应该能够SpellcheckAsync根据需要多次调用,而不必担心取消待处理的会话.主doMore循环在UI线程上运行(因为它涉及UI,而所有拼写检查服务提供程序调用都是IO绑定的).

我感到担心的是,我不得不把API分成两个peices有点不舒服,SpellcheckAsyncSpellcheckAsyncHelper,但我想不出这样做的更好的方式,这是有待考验.

Ste*_*ary 20

我认为一般概念非常好,但我建议你不要使用ContinueWith.

我只是使用常规编写它await,并且很多"我已经在运行"逻辑是没有必要的:

Task pendingTask = null; // pending session
CancellationTokenSource cts = null; // CTS for pending session

// SpellcheckAsync is called by the client app on the UI thread
public async Task<bool> SpellcheckAsync(CancellationToken token)
{
    // SpellcheckAsync can be re-entered
    var previousCts = this.cts;
    var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
    this.cts = newCts;

    if (previousCts != null)
    {
        // cancel the previous session and wait for its termination
        previousCts.Cancel();
        try { await this.pendingTask; } catch { }
    }

    newCts.Token.ThrowIfCancellationRequested();
    this.pendingTask = SpellcheckAsyncHelper(newCts.Token);
    return await this.pendingTask;
}

// the actual task logic
async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
{
    // do the work (pretty much IO-bound)
    using (...)
    {
        bool doMore = true;
        while (doMore)
        {
            token.ThrowIfCancellationRequested();
            await Task.Delay(500); // placeholder to call the provider
        }
        return doMore;
    }
}
Run Code Online (Sandbox Code Playgroud)

  • @Stephen Cleary,对于您在异步方面所做的工作,我深表敬意,所以请不要以这种错误的方式:我只是好奇。令我有些惊讶的是,您没有使用`SemaphoreSlim`或您自己的`AsyncLock`或类似的东西重写`await this.pendingTask`部分。您是否通常认为提高异步方法的“同步”部分中的线程安全性是过早的优化? (2认同)

nos*_*tio 5

这是我使用的取消和重启模式的最新版本:

class AsyncWorker
{
    Task _pendingTask;
    CancellationTokenSource _pendingTaskCts;

    // the actual worker task
    async Task DoWorkAsync(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        Debug.WriteLine("Start.");
        await Task.Delay(100, token);
        Debug.WriteLine("Done.");
    }

    // start/restart
    public void Start(CancellationToken token)
    {
        var previousTask = _pendingTask;
        var previousTaskCts = _pendingTaskCts;

        var thisTaskCts = CancellationTokenSource.CreateLinkedTokenSource(token);

        _pendingTask = null;
        _pendingTaskCts = thisTaskCts;

        // cancel the previous task
        if (previousTask != null && !previousTask.IsCompleted)
            previousTaskCts.Cancel();

        Func<Task> runAsync = async () =>
        {
            // await the previous task (cancellation requested)
            if (previousTask != null)
                await previousTask.WaitObservingCancellationAsync();

            // if there's a newer task started with Start, this one should be cancelled
            thisTaskCts.Token.ThrowIfCancellationRequested();

            await DoWorkAsync(thisTaskCts.Token).WaitObservingCancellationAsync();
        };

        _pendingTask = Task.Factory.StartNew(
            runAsync,
            CancellationToken.None,
            TaskCreationOptions.None,
            TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
    }

    // stop
    public void Stop()
    {
        if (_pendingTask == null)
            return;

        if (_pendingTask.IsCanceled)
            return;

        if (_pendingTask.IsFaulted)
            _pendingTask.Wait(); // instantly throw an exception

        if (!_pendingTask.IsCompleted)
        {
            // still running, request cancellation 
            if (!_pendingTaskCts.IsCancellationRequested)
                _pendingTaskCts.Cancel();

            // wait for completion
            if (System.Threading.Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA)
            {
                // MTA, blocking wait
                _pendingTask.WaitObservingCancellation();
            }
            else
            {
                // TODO: STA, async to sync wait bridge with DoEvents,
                // similarly to Thread.Join
            }
        }
    }
}

// useful extensions
public static class Extras
{
    // check if exception is OperationCanceledException
    public static bool IsOperationCanceledException(this Exception ex)
    {
        if (ex is OperationCanceledException)
            return true;

        var aggEx = ex as AggregateException;
        return aggEx != null && aggEx.InnerException is OperationCanceledException;
    }

    // wait asynchrnously for the task to complete and observe exceptions
    public static async Task WaitObservingCancellationAsync(this Task task)
    {
        try
        {
            await task;
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }

    // wait for the task to complete and observe exceptions
    public static void WaitObservingCancellation(this Task task)
    {
        try
        {
            task.Wait();
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

测试使用(仅产生一个"开始/完成"输出DoWorkAsync):

private void MainForm_Load(object sender, EventArgs e)
{
    var worker = new AsyncWorker();
    for (var i = 0; i < 10; i++)
        worker.Start(CancellationToken.None);
}
Run Code Online (Sandbox Code Playgroud)

  • 此模式的更新和功能版本在这里:http://stackoverflow.com/a/21427264/1768303 (2认同)