Jam*_*ack 2 concurrency pfx plinq c#-4.0
我在我的 .NET 4 应用程序中使用了并行数据结构,并且ConcurrentQueue在我处理它时添加了一个。
我想做类似的事情:
personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );
当我调用数据库来保存数据时,我限制了并发线程的数量。
但是,我希望ForAll不会出队,我担心只是做
ForAll(i => {
personqueue.personqueue.TryDequeue(...);
...
});
Run Code Online (Sandbox Code Playgroud)
因为不能保证我弹出正确的一个。
那么,我如何以并行方式遍历收集和出队。
或者,使用PLINQ并行进行此处理会更好吗?
好吧,我不是 100% 确定您尝试在这里存档的内容。您是否试图将所有项目出列直到什么都没有?或者只是一次性出列大量项目?
第一个可能出乎意料的行为始于以下语句:
theQueue.AsParallel()
Run Code Online (Sandbox Code Playgroud)
对于 ConcurrentQueue,您将获得一个“快照”-枚举器。因此,当您迭代并发堆栈时,您只会迭代快照,而不是“实时”队列。
一般来说,我认为迭代您在迭代期间正在更改的内容并不是一个好主意。
所以另一个解决方案看起来像这样:
// this way it's more clear, that we only deque for theQueue.Count items
// However after this, the queue is probably not empty
// or maybe the queue is also empty earlier
Parallel.For(0, theQueue.Count,
new ParallelOptions() {MaxDegreeOfParallelism = 20},
() => {
theQueue.TryDequeue(); //and stuff
});
Run Code Online (Sandbox Code Playgroud)
这避免了在迭代时操作某些东西。但是,在该语句之后,队列仍然可以包含在 for 循环期间添加的数据。
要及时使队列为空,您可能需要做更多的工作。这是一个非常丑陋的解决方案。当队列仍有项目时,创建新任务。每个任务都尽可能地从队列中出队。最后,我们等待所有任务结束。为了限制并行性,我们从不创建超过 20 个任务。
// Probably a kitty died because of this ugly code ;)
// However, this code tries to get the queue empty in a very aggressive way
Action consumeFromQueue = () =>
{
while (tt.TryDequeue())
{
; // do your stuff
}
};
var allRunningTasks = new Task[MaxParallism];
for(int i=0;i<MaxParallism && tt.Count>0;i++)
{
allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);
}
Task.WaitAll(allRunningTasks);
Run Code Online (Sandbox Code Playgroud)