为什么迭代GetConsumingEnumerable()不会完全清空底层的阻塞集合

Eoi*_*ell 17 .net c# wpf task-parallel-library blockingcollection

我使用任务并行库,可量化和可重复的问题BlockingCollection<T>,ConcurrentQueue<T>GetConsumingEnumerable试图创建一个简单的管道.

简而言之,从一个线程向默认值BlockingCollection<T>(引擎盖下依赖于a ConcurrentQueue<T>)添加条目并不能保证它们会BlockingCollection<T>从调用GetConsumingEnumerable()Method的另一个线程中弹出.

我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上.

  • Timer1负责排队工作项...它使用一个被调用的并发字典,_tracker以便它知道它已经添加到阻塞集合中的内容.
  • Timer2只记录两个BlockingCollection&的计数状态_tracker
  • START按钮启动一个Paralell.ForEach简单地遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框的按钮.
  • STOP按钮会停止Timer1阻止将更多条目添加到阻止集合中.
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }
Run Code Online (Sandbox Code Playgroud)

这是事件的顺序:

  • 按开始
  • Timer1滴答和ListBox1立即更新3条消息(添加0,1,2)
  • ListBox2随后更新为3条消息,相隔1秒
    • 处理0
    • 处理1
    • 处理2
  • Timer1滴答和ListBox1立即更新3条消息(添加3,4,5)
  • ListBox2随后更新了2条消息,相隔1秒
    • 处理3
    • 处理4
    • 处理5未打印...似乎已"失踪"
  • 按STOP可防止计时器1添加更多消息
  • 等待......"处理5"仍然没有出现

缺少参赛作品

您可以看到并发字典仍在跟踪尚未处理的1个项目以及随后从中删除的项目 _tracker

如果我再次按Start,则timer1开始添加更多3个条目,并行循环恢复生命打印5,6,7和8.

在后面的项目推进后面的条目返回

我完全不知道为什么会这样.再次调用启动显然会调用newtask,它调用Paralell foreach,并重新执行GetConsumingEnumerable(),它会神奇地找到丢失的条目......

为什么BlockingCollection.GetConsumingEnumerable()不保证迭代添加到集合中的每个项目.

为什么随后添加更多条目会导致它"无法"并继续处理?

adr*_*anm 19

您不能使用GetConsumingEnumerable()Parallel.ForEach().

使用GetConsumingPartitioner第三方物流演员

在博客文章中,您还将获得解释为什么不能使用 GetConsumingEnumerable()

Parallel.ForEach和PLINQ默认使用的分区算法使用分块以最小化同步成本:而不是每个元素锁定一次,它将获取锁定,获取一组元素(一个块),然后释放锁.

即Parallel.ForEach等到它继续之前收到一组工作项.正是您的实验所显示的内容.

  • TPL附加功能属于MS-LPL许可证,这意味着如果您使用它们,则将整个衍生作品锁定到Windows.它不是OSI批准的许可证...... (2认同)

Alo*_*zo2 7

从.net 4.5开始,您可以创建一个一次只能使用1个项目的分区程序:

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}
Run Code Online (Sandbox Code Playgroud)

https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx