Parallel.ForEach与BlockingCollection集成时停滞不前

use*_*715 6 .net c# parallel-processing task-parallel-library

我根据这个问题中的代码采用了并行/消费者的实现

class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel, Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),
                        new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
                        (item, loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },
                _tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}
Run Code Online (Sandbox Code Playgroud)

这是一个测试代码

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100, 1);
    }

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,
                                              (i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}
Run Code Online (Sandbox Code Playgroud)

测试总是失败 - 它总是停留在测试的100个项目的第93项.知道我的代码的哪一部分引起了这个问题,以及如何解决它?

Mat*_*son 8

您不能使用Parallel.Foreach()BlockingCollection.GetConsumingEnumerable(),因为你已经发现.

有关解释,请参阅此博客文章:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

该博客还提供了一个GetConsumingPartitioner()可用于解决问题的方法的源代码.

摘自博客:

BlockingCollection的GetConsumingEnumerable实现正在使用BlockingCollection的内部同步,它已经同时支持多个使用者,但ForEach不知道这一点,并且其可枚举分区逻辑在访问可枚举时也需要锁定.

因此,这里的同步比实际需要的更多,导致潜在的不可忽略的性能损失.

[另外] Parallel.ForEach和PLINQ默认使用的分区算法使用分块以最小化同步成本:而不是每个元素锁定一次,它将获取锁定,获取一组元素(一个块) ,然后释放锁.

虽然这种设计可以帮助提高整体吞吐量,但对于更注重低延迟的场景,这种分块可能会让人望而却步.