取消长时间运行任务后如何正确清理

Mat*_*ttC 6 c# concurrency task-parallel-library cancellation c#-4.0

我创建了一个类,其目的是抽象出对队列的并发访问的控制.

该类被设计为在单个线程上实例化,由多个线程写入,然后从后续的单个线程读取.

我在类中生成了一个长时间运行的任务,它将执行阻塞循环并在项成功出列时触发事件.

我的问题是:我执行取消长时间运行的任务并随后清理/重置CancellationTokenSource对象的正确用法吗?

理想情况下,我希望能够在保持可用性添加到队列的同时停止并重新启动活动对象.

我用Peter Bromberg的文章作为基础:C#4.0中的生产者/消费者队列和BlockingCollection

代码如下:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    public delegate void DeliverNextQueuedItemHandler<T>(T item);

public sealed class SOQueueManagerT<T> 
{

    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning { get; private set; }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public CancellationTokenSource CancellationTokenSource
    {
        get
        {
            if (_canceller == null)
                _canceller = new CancellationTokenSource();

            return _canceller;
        }
    }

    public SOQueueManagerT()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);

        IsRunning = false;
    }

    public void Start()
    {
        if (_listener == null)
        {


            IsRunning = true;

            _listener = Task.Factory.StartNew(() =>
            {

                while (!CancellationTokenSource.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {

                            OnNextItem(item);
                        }
                    }

                }
            },
            CancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            CancellationTokenSource.Cancel();
            CleanUp();
        }
    }

    public void Add(T item)
    {
        _queue.Add(item);
    }

    private void CleanUp()
    {
        _listener.Wait(2000);
        if (_listener.IsCompleted)
        {
            IsRunning = false;
            _listener = null;
            _canceller = null;
        }
    }


 }
}
Run Code Online (Sandbox Code Playgroud)

更新 这是我最后一起去的地方.它并不完美,但到目前为止还在做这项工作.

public sealed class TaskQueueManager<T> 
{
    ConcurrentQueue<T> _multiQueue;
    BlockingCollection<T> _queue;
    CancellationTokenSource _canceller;
    Task _listener = null;

    public event DeliverNextQueuedItemHandler<T> OnNextItem;

    public bool IsRunning
    {
        get
        {
            if (_listener == null)
                return false;
            else if (_listener.Status == TaskStatus.Running ||
                _listener.Status == TaskStatus.Created ||
                _listener.Status == TaskStatus.WaitingForActivation ||
                _listener.Status == TaskStatus.WaitingToRun ||
                _listener.IsCanceled)
                return true;
            else
                return false;
        }
    }
    public int QueueSize
    {
        get
        {
            if (_queue != null)
                return _queue.Count;
            return -1;
        }
    }

    public TaskQueueManager()
    {
        _multiQueue = new ConcurrentQueue<T>();
        _queue = new BlockingCollection<T>(_multiQueue);
    }

    public void Start()
    {
        if (_listener == null)
        {
            _canceller = new CancellationTokenSource();

            _listener = Task.Factory.StartNew(() =>
            {
                while (!_canceller.Token.IsCancellationRequested)
                {
                    T item;
                    if (_queue.TryTake(out item, 100))
                    {
                        if (OnNextItem != null)
                        {
                            try
                            {
                                OnNextItem(item);
                            }
                            catch (Exception e)
                            {
                                //log or call an event
                            }
                        }
                    }
                }
            },
            _canceller.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }
    }

    public void Stop()
    {
        if (_listener != null)
        {
            _canceller.Cancel();

            if (_listener.IsCanceled && !_listener.IsCompleted)
                _listener.Wait();

            _listener = null;
            _canceller = null;
        }
    }

    public void Add(T item)
    {
        if (item != null)
        {
            _queue.Add(item);
        }
        else
        {
            throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Joh*_*ren 1

仔细的编程是唯一能够解决问题的方法。即使您取消操作,您也可能有一个未在流行时间内完成的待处理操作。这很可能是一个陷入僵局的阻塞操作。在这种情况下,您的程序实际上不会终止。

例如,如果我多次调用 CleanUp 方法或者没有先调用 Start,我就会感觉它会崩溃。

清理期间的 2 秒超时,感觉比计划的更随意,而且我实际上会尽可能确保事物正确关闭或崩溃/挂起(你永远不想让并发事物处于未知状态)。

此外,IsRunning是显式设置的,而不是从对象的状态推断的。

为了获得灵感,我希望您看看我最近编写的一个类似的类,它是一个生产者/消费者模式,在后台线程中工作。您可以在CodePlex上找到该源代码。不过,这是为了解决一个非常具体的问题而设计的。

在这里,取消是通过查询只有工作线程识别的特定类型来解决的,从而开始关闭。这也确保我永远不会取消待处理的工作,只考虑整个工作单元。

为了稍微改善这种情况,您可以为当前工作设置一个单独的计时器,并在取消时中止或回滚未完成的工作。现在,实现类似事务的行为将需要一些尝试和错误,因为您需要查看每个可能的极端情况并问自己,如果程序在这里崩溃会发生什么?理想情况下,所有这些代码路径都会导致可恢复或已知状态,您可以从中恢复工作。但正如我想您已经猜到的那样,这将需要仔细的编程和大量的测试。