Eoi*_*ell 17 .net c# wpf task-parallel-library blockingcollection
我使用任务并行库,可量化和可重复的问题BlockingCollection<T>,ConcurrentQueue<T>与GetConsumingEnumerable试图创建一个简单的管道.
简而言之,从一个线程向默认值BlockingCollection<T>(引擎盖下依赖于a ConcurrentQueue<T>)添加条目并不能保证它们会BlockingCollection<T>从调用GetConsumingEnumerable()Method的另一个线程中弹出.
我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上.
Timer1负责排队工作项...它使用一个被调用的并发字典,_tracker以便它知道它已经添加到阻塞集合中的内容.Timer2只记录两个BlockingCollection&的计数状态_trackerParalell.ForEach简单地遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框的按钮.Timer1阻止将更多条目添加到阻止集合中.public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
Run Code Online (Sandbox Code Playgroud)
这是事件的顺序:

您可以看到并发字典仍在跟踪尚未处理的1个项目以及随后从中删除的项目 _tracker
如果我再次按Start,则timer1开始添加更多3个条目,并行循环恢复生命打印5,6,7和8.

我完全不知道为什么会这样.再次调用启动显然会调用newtask,它调用Paralell foreach,并重新执行GetConsumingEnumerable(),它会神奇地找到丢失的条目......
为什么BlockingCollection.GetConsumingEnumerable()不保证迭代添加到集合中的每个项目.
为什么随后添加更多条目会导致它"无法"并继续处理?
adr*_*anm 19
您不能使用GetConsumingEnumerable()在Parallel.ForEach().
使用GetConsumingPartitioner从第三方物流演员
在博客文章中,您还将获得解释为什么不能使用 GetConsumingEnumerable()
Parallel.ForEach和PLINQ默认使用的分区算法使用分块以最小化同步成本:而不是每个元素锁定一次,它将获取锁定,获取一组元素(一个块),然后释放锁.
即Parallel.ForEach等到它继续之前收到一组工作项.正是您的实验所显示的内容.
从.net 4.5开始,您可以创建一个一次只能使用1个项目的分区程序:
var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}
Run Code Online (Sandbox Code Playgroud)