BlockingCollection(T).GetConsumingEnumerable()如何抛出OperationCanceledException?

Asi*_*sik 9 c# task-parallel-library

我正在使用BlockingCollection来实现任务调度程序,基本上:

public class DedicatedThreadScheduler : TaskScheduler, IDisposable
{
    readonly BlockingCollection<Task> m_taskQueue = new BlockingCollection<Task>();

    readonly Thread m_thread;


    public DedicatedThreadScheduler()
    {
        m_thread = new Thread(() =>
        {
            foreach (var task in m_taskQueue.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
            m_taskQueue.Dispose();
        });
        m_thread.Start();
    }

    public void Dispose()
    {
        m_taskQueue.CompleteAdding();
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return Thread.CurrentThread == m_thread && TryExecuteTask(task);
    }

    (...)
}
Run Code Online (Sandbox Code Playgroud)

我只看过一次并且无法重现这一点,但是在foreach的某个时刻(在TryTakeWithNoTimeValidation中)我得到了一个OperationCanceledException.我不明白,因为我正在使用不采用CancellationToken的重载,并且文档声明它可能只抛出ObjectDisposedException.这个例外是什么意思?封锁收集完成了吗?队列中的任务被取消了?

更新:调用堆栈如下所示:

mscorlib.dll!System.Threading.SemaphoreSlim.WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, System.Threading.CancellationToken cancellationToken) + 0x36 bytes 
mscorlib.dll!System.Threading.SemaphoreSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x178 bytes   
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.TryTakeWithNoTimeValidation(out System.Threading.Tasks.Task item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken, System.Threading.CancellationTokenSource combinedTokenSource) Line 710 + 0x25 bytes   C#
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.GetConsumingEnumerable(System.Threading.CancellationToken cancellationToken) Line 1677 + 0x18 bytes    C#
Run Code Online (Sandbox Code Playgroud)

0b1*_*010 12

这是一个老问题,但我会为将来发现它的人添加完整的答案.Eugene提供的答案部分正确; 当时你必须使用Visual Studio进行调试,配置为打破已处理的框架异常.

但是,您打破的实际原因OperationCanceledException是代码BlockingCollection<T>.CompleteAdding()看起来像这样:

    public void CompleteAdding()
    {
        int num;
        this.CheckDisposed();
        if (this.IsAddingCompleted)
        {
            return;
        }
        SpinWait wait = new SpinWait();
    Label_0017:
        num = this.m_currentAdders;
        if ((num & -2147483648) != 0)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
        }
        else if (Interlocked.CompareExchange(ref this.m_currentAdders, num | -2147483648, num) == num)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
            if (this.Count == 0)
            {
                this.CancelWaitingConsumers();
            }
            this.CancelWaitingProducers();
        }
        else
        {
            wait.SpinOnce();
            goto Label_0017;
        }
    }
Run Code Online (Sandbox Code Playgroud)

注意这些特定的行:

if (this.Count == 0)
{
    this.CancelWaitingConsumers();
}
Run Code Online (Sandbox Code Playgroud)

调用此方法:

private void CancelWaitingConsumers()
{
    this.m_ConsumersCancellationTokenSource.Cancel();
}
Run Code Online (Sandbox Code Playgroud)

因此,即使您未CancellationToken在代码中明确使用a ,底层框架代码OperationCanceledException也会在调用BlockingCollection时抛出if CompleteAdding().这样做是GetConsumingEnumerable()为了表示退出的方法.该异常由框架代码处理,如果您没有将调试器配置为拦截它,您就不会注意到它.

你无法复制它的原因是因为你放置你的电话给CompleteAdding()你的Dispose()方法.因此,它在GC的一时兴起被召唤.