取消BlockingCollection.GetConsumingEnumerable()并处理剩下的内容

Jas*_*son 7 .net c# multithreading blockingcollection

我有一个流程生成工作,第二个流程使用BlockingCollection<>这个工作.当我关闭我的程序时,我需要我的消费者停止消费工作,但我仍然需要快速记录待处理但尚未消耗的工作.

现在,我的消费者产生了一个有foreach (<object> in BlockingCollection.GetConsumingEnumerable())循环的线程.当我停止我的程序时,我的制作人打电话Consumer.BlockingCollection.CompleteAdding().我发现我的消费者继续处理队列中的所有内容.

谷歌搜索问题告诉我,我需要使用CancellationToken.所以我试了一下:

private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我的制作人有:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();
Run Code Online (Sandbox Code Playgroud)

当我尝试这个时,我没有得到OperationCancelledException发生的迹象.

这个问题试图解释使用取消令牌,但似乎它没有正确使用它.(论证:如果它有效,那么它"足够正确".)这个问题似乎与我的问题完全重复,但没有例子.(同样在这里.)

因此,要重申:如何正确使用一CancellationTokenBlockingCollection.GetConsumingEnumerable()与我需要处理其余的项目在队列中它被使用不同的方法取消后的警告?

(我认为我的问题集中在正确使用CancellationToken.我的测试都没有表明该进程实际上被取消了.(StopFlag.IsCancellationRequested总是等于false.))

Ser*_*rvy 5

当您将传递给时CancellationTokenGetConsumingEnumerable它不会抛出请求取消的异常,它只会停止吐出物品。而不是捕获异常,而是检查令牌:

foreach (var item in BlockingCollection.
    GetConsumingEnumerable(CancellationToken))
{
    //consume item
}
if (CancellationToken.IsCancellationRequested)
    foreach (var item in BlockingCollection)
    {
        //log item
    }
Run Code Online (Sandbox Code Playgroud)

还要注意,如果请求取消,并且有可能CompletedAdding没有被调用,那么您应该仅迭代集合,而不是call GetConsumingEnumerable。如果您知道取消操作后生产者将完成添加,那么这不是问题。

  • 当请求取消时,GetConsumingEnumerable将引发异常 (3认同)

Jas*_*son 3

我的问题在于我如何尝试取消该操作。我没有让生产者拥有 CancellationTokenSource,而是将其全部放在消费者中。

public class cProducer {
    private cConsumer myConsumer = new cConsumer ();

    public void onStart () {
        myConsumer.OnStart ();
    }

    public void onStop () {
        myConsumer.OnStop ();
    }

    public void OnOrderReceived (cOrder newOrder) {
        myConsumer.orderQueue.Add (cOrder);
    }
}

public class cConsumer {
    private CancellationTokenSource stopFlag;
    public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
    private Task processingTask;

    public void OnStart () {
        stopFlag = new CancellationTokenSource ();
        processingTask = Task.Factory.StartNew (() => Process ());
    }

    public void OnStop () {
        stopFlag.Cancel ();
        orderQueue.CompleteAdding ();
        processingTask.Wait ();
    }

    private void Process () {
        try {
            foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                // process
            }
        }
        catch (OperationCanceledException) {
            foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                // log it
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)