使用 ConcurrentBag 的 Parallel ForEach 无法按预期工作

zig*_*zig 2 c# parallel-processing concurrency parallel.foreach

我有这个处理列表中项目的代码:

    static readonly object _Lock = new object();

    public class Item
    {
        public string Name;
        public string ID;
    }

    static void Main(string[] args)
    {
        var items = new List<Item>
        {
            new Item { Name = "One", ID = "123" },
            new Item { Name = "Two", ID = "234" },
            new Item { Name = "Three", ID = "123" }
        };

        var itemsProcess = new ConcurrentBag<Item>();
        Parallel.ForEach(items, (item) =>
        {
            Item itemProcess = null;
            // lock (_Lock)
            {
                itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
            }
            if (itemProcess != null)
            {
                Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
            }
            else
            {
                itemsProcess.Add(item);
                Console.WriteLine($"Processing item [{item.Name}]");
                Thread.Sleep(1000); // do some work...
            }
        });

        Console.ReadKey();
      }
Run Code Online (Sandbox Code Playgroud)

我基本上是使用ConcurrentBag来根据几个条件检查对象是否存在。
希望总是得到这样的输出(顺序可能会有所不同):

Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]
Run Code Online (Sandbox Code Playgroud)

有时得到一个输出,这表明我的代码不是线程安全的:

Processing item [Three]
Processing item [One]
Processing item [Two]
Run Code Online (Sandbox Code Playgroud)

所以我认为itemsProcess.FirstOrDefault()会阻塞的假设是错误的。
使用lock不会改变任何东西。显然,这里出了点问题,我真的不明白为什么?

我知道我可以通过其他方式“解决”这个问题(一种是在进入之前准备列表Parallel.ForEach()),但我真的很想知道为什么会出现这种行为?

Joh*_*lay 5

您的并行循环中有 2 个独立的操作:FirstOrDefaultAdd.

ConcurrentBag 无法确保这两个操作之间的线程安全。

另一种方法是ConcurrentDictionary,它有一个GetOrAdd方法,当键不存在时,它只会添加一个项目:

var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
    // Returns existing item with same ID or adds this item
    var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
    if (!object.ReferenceEquals(item, itemProcess))
    {
        Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
    }
    else
    {
        Console.WriteLine($"Processing item [{item.Name}]");
        // do some work...
    }
});
Run Code Online (Sandbox Code Playgroud)

如果您随后需要将处理过的项目作为ICollection,则可以通过 访问它们itemsProcess.Values