zig*_*zig 2 c# parallel-processing concurrency parallel.foreach
我有这个处理列表中项目的代码:
static readonly object _Lock = new object();
public class Item
{
public string Name;
public string ID;
}
static void Main(string[] args)
{
var items = new List<Item>
{
new Item { Name = "One", ID = "123" },
new Item { Name = "Two", ID = "234" },
new Item { Name = "Three", ID = "123" }
};
var itemsProcess = new ConcurrentBag<Item>();
Parallel.ForEach(items, (item) =>
{
Item itemProcess = null;
// lock (_Lock)
{
itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
}
if (itemProcess != null)
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
itemsProcess.Add(item);
Console.WriteLine($"Processing item [{item.Name}]");
Thread.Sleep(1000); // do some work...
}
});
Console.ReadKey();
}
Run Code Online (Sandbox Code Playgroud)
我基本上是使用ConcurrentBag来根据几个条件检查对象是否存在。
我希望总是得到这样的输出(顺序可能会有所不同):
Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]
Run Code Online (Sandbox Code Playgroud)
但有时我会得到一个输出,这表明我的代码不是线程安全的:
Processing item [Three]
Processing item [One]
Processing item [Two]
Run Code Online (Sandbox Code Playgroud)
所以我认为itemsProcess.FirstOrDefault()会阻塞的假设是错误的。
使用lock不会改变任何东西。显然,这里出了点问题,我真的不明白为什么?
我知道我可以通过其他方式“解决”这个问题(一种是在进入之前准备列表Parallel.ForEach()),但我真的很想知道为什么会出现这种行为?
您的并行循环中有 2 个独立的操作:FirstOrDefault和Add.
ConcurrentBag 无法确保这两个操作之间的线程安全。
另一种方法是ConcurrentDictionary,它有一个GetOrAdd方法,当键不存在时,它只会添加一个项目:
var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
// Returns existing item with same ID or adds this item
var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
if (!object.ReferenceEquals(item, itemProcess))
{
Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
}
else
{
Console.WriteLine($"Processing item [{item.Name}]");
// do some work...
}
});
Run Code Online (Sandbox Code Playgroud)
如果您随后需要将处理过的项目作为ICollection,则可以通过 访问它们itemsProcess.Values。