使用 ConcurrentQueue 时,尝试在并行循环时出队

Jam*_*ack 2 concurrency pfx plinq c#-4.0

我在我的 .NET 4 应用程序中使用了并行数据结构,并且ConcurrentQueue在我处理它时添加了一个。

我想做类似的事情:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

当我调用数据库来保存数据时,我限制了并发线程的数量。

但是,我希望ForAll不会出队,我担心只是做

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});
Run Code Online (Sandbox Code Playgroud)

因为不能保证我弹出正确的一个。

那么,我如何以并行方式遍历收集和出队。

或者,使用PLINQ并行进行此处理会更好吗?

Gam*_*lor 5

好吧,我不是 100% 确定您尝试在这里存档的内容。您是否试图将所有项目出列直到什么都没有?或者只是一次性出列大量项目?

第一个可能出乎意料的行为始于以下语句:

 theQueue.AsParallel()
Run Code Online (Sandbox Code Playgroud)

对于 ConcurrentQueue,您将获得一个“快照”-枚举器。因此,当您迭代并发堆栈时,您只会迭代快照,而不是“实时”队列。

一般来说,我认为迭代您在迭代期间正在更改的内容并不是一个好主意。

所以另一个解决方案看起来像这样:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });
Run Code Online (Sandbox Code Playgroud)

这避免了在迭代时操作某些东西。但是,在该语句之后,队列仍然可以包含在 for 循环期间添加的数据。

要及时使队列为空,您可能需要做更多的工作。这是一个非常丑陋的解决方案。当队列仍有项目时,创建新任务。每个任务都尽可能地从队列中出队。最后,我们等待所有任务结束。为了限制并行性,我们从不创建超过 20 个任务。

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);
Run Code Online (Sandbox Code Playgroud)