goo*_*ate 5 c# concurrency multithreading thread-safety task-parallel-library
本文的底部描述了如何使用GetOrAdd(如果我理解正确)可能导致损坏/意外结果.
喀嚓/
ConcurrentDictionary专为多线程场景而设计.您不必在代码中使用锁来添加或删除集合中的项.但是,一个线程始终可以检索一个值,而另一个线程可以通过为相同的键提供一个新值来立即更新该集合.
此外,尽管ConcurrentDictionary的所有方法都是线程安全的,但并非所有方法都是原子方法,特别是GetOrAdd和AddOrUpdate.传递给这些方法的用户委托是在字典的内部锁之外调用的.(这样做是为了防止未知代码阻塞所有线程.)因此,可能会发生以下事件序列:
1)threadA调用GetOrAdd,找不到任何项,并通过调用valueFactory委托创建一个新项添加到Add.
2)threadB同时调用GetOrAdd,调用其valueFactory委托,并在threadA之前到达内部锁,因此将其新的键值对添加到字典中.
3)threadA的用户委托完成,线程到达锁,但现在看到该项已经存在
4)threadA执行"Get",并返回先前由threadB添加的数据.
因此,无法保证GetOrAdd返回的数据与线程的valueFactory创建的数据相同.调用AddOrUpdate时,可能会发生类似的事件序列.
题
验证数据的正确方法是什么,并重试更新?一个很好的方法是根据旧值的内容尝试/重试此操作的扩展方法.
这将如何实施?我可以依赖result(verify)作为有效结束状态,还是必须使用不同的方法重试并重新检索值?
码
更新值时,以下代码具有竞争条件.期望的行为是AddOrUpdateWithoutRetrieving()将以不同的方式递增各种值(使用++或Interlocked.Increment()).
我还想在一个单元中执行多个字段操作,如果先前的更新由于竞争条件而没有"占用",则重试更新.
运行代码,您将看到控制台中出现的每个值开始增加1,但每个值都会漂移,有些值会前后几次迭代.
namespace DictionaryHowTo
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// The type of the Value to store in the dictionary:
class FilterConcurrentDuplicate
{
// Create a new concurrent dictionary.
readonly ConcurrentDictionary<int, TestData> eventLogCache =
new ConcurrentDictionary<int, TestData>();
static void Main()
{
FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();
c.DoRace(null);
}
readonly ConcurrentDictionary<int, TestData> concurrentCache =
new ConcurrentDictionary<int, TestData>();
void DoRace(string[] args)
{
int max = 1000;
// Add some key/value pairs from multiple threads.
Task[] tasks = new Task[3];
tasks[0] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 500);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[1] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[2] = Task.Factory.StartNew(() =>
{
AddOrUpdateWithoutRetrieving();
});
// Output results so far.
Task.WaitAll(tasks);
AddOrUpdateWithoutRetrieving();
Console.WriteLine("Press any key.");
Console.ReadKey();
}
public class TestData : IEqualityComparer<TestData>
{
public string aStr1 { get; set; }
public Guid? aGud1 { get; set; }
public string aStr2 { get; set; }
public int aInt1 { get; set; }
public long? aLong1 { get; set; }
public DateTime aDate1 { get; set; }
public DateTime? aDate2 { get; set; }
//public int QueryCount { get; set; }
public int QueryCount = 0;//
public string zData { get; set; }
public bool Equals(TestData x, TestData y)
{
return x.aStr1 == y.aStr1 &&
x.aStr2 == y.aStr2 &&
x.aGud1 == y.aGud1 &&
x.aStr2 == y.aStr2 &&
x.aInt1 == y.aInt1 &&
x.aLong1 == y.aLong1 &&
x.aDate1 == y.aDate1 &&
x.QueryCount == y.QueryCount ;
}
public int GetHashCode(TestData obj)
{
TestData ci = (TestData)obj;
// http://stackoverflow.com/a/263416/328397
return
new {
A = ci.aStr1,
Aa = ci.aStr2,
B = ci.aGud1,
C = ci.aStr2,
D = ci.aInt1,
E = ci.aLong1,
F = ci.QueryCount ,
G = ci.aDate1}.GetHashCode();
}
}
private void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
TestData ci = new TestData()
{
aStr1 = "Austin",
aGud1 = new Guid(),
aStr2 = "System",
aLong1 = 100,
aInt1 = 1000,
QueryCount = 0,
aDate1 = DateTime.MinValue
};
TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
existingVal.aStr2 = "test1" + existingVal.QueryCount;
existingVal.aDate1 = DateTime.MinValue;
Console.WriteLine
("Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count A:" + existingVal.QueryCount);
Interlocked.Increment(ref existingVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
existingVal.aInt1++;
existingVal.aDate1 =
existingVal.aDate1.AddSeconds
(existingVal.aInt1);
Console.WriteLine(
"Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count B:" + existingVal.QueryCount);
return existingVal;
});
// After each run, every value here should be ++ the previous value
Console.WriteLine(
"Thread:"+Thread.CurrentThread.ManagedThreadId +
": Query Count returned:" + verify.QueryCount +
" eid:" + verify.aInt1 + " date:" +
verify.aDate1.Hour + " " + verify.aDate1.Second +
" NAME:" + verify.aStr2
);
}
}
}
Run Code Online (Sandbox Code Playgroud)
产量
Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System
Thread:12 Query Count A:0
Thread:13 Query Count A:1
Thread:12 Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11
Thread:12 Query Count A:2
Thread:13 Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12
Thread:13 Query Count A:3
Thread:11 Query Count A:4
Thread:11 Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14
Thread:11 Query Count A:5
Thread:13 Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15
Run Code Online (Sandbox Code Playgroud)
....
Thread:11 Query Count A:658
Thread:11 Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658
Thread:11 Query Count A:659
Thread:11 Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659
Thread:11 Query Count A:660
Thread:11 Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660
Thread:11 Query Count A:661
Thread:11 Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661
Run Code Online (Sandbox Code Playgroud)
在此代码中,"eid"应始终比查询计数多1,000,但在迭代中,两者之间的差异从1到7不等.这种不一致可能会导致某些应用程序失败或报告错误的数据.
小智 5
此提交基于对文章底部备注的错误理解 \xe2\x80\x9c如何:从 ConcurrentDictionary 添加和删除项目\xe2\x80\x9d http://msdn.microsoft.com/en -us/library/dd997369.aspx以及关于共享对象的基本并发错误 \xe2\x80\x93 并发非原子修改。
\n首先,让\xe2\x80\x99s 澄清链接文章的真正含义。I\xe2\x80\x99ll 使用AddOrUpdate作为示例,但推理GetOrAdd是等效的。
假设您AddOrUpdate从多个线程调用并指定相同的密钥。假设具有该键的条目已经存在。\n每个线程都会出现,注意已经存在具有指定键的条目,并且更新部分AddOrUpdate是相关的。\n这样做时,没有线程会锁定字典。相反,它将使用一些互锁指令来自动检查输入键是否存在。
因此,我们的几个线程都注意到密钥存在并且需要updateValueFactory调用。该代表被传递给AddOrUpdate; 它引用现有的键和值并返回更新值。现在,所有涉及的线程将同时调用工厂。然后它们都将以某种先前未知的顺序完成,并且每个线程将尝试使用原子操作(使用互锁指令)将现有值替换为刚刚计算的值。无法知道哪个线程将\xe2\x80\x9cwin\xe2\x80\x9d。获胜的线程将存储其计算值。其他人会注意到字典中的值不再是updateValueFactory作为参数传递给他们的值。为了响应这种认识,他们将放弃操作并丢弃刚刚计算的值。这正是您想要发生的事情。
接下来,让我们澄清为什么在运行此处列出的代码示例时会得到奇怪的值:
\n回想一下,updateValueFactory传递给的委托AddOrUpdate采用 REFERENCES 现有键和值并返回更新值。\n其AddOrUpdateWithoutRetrieving()方法中的代码示例开始直接对该引用执行操作。它不是创建新的替换值并修改它,而是修改existingVal字典 \xe2\x80\x93 中已有的对象 \xe2\x80\x93 的实例成员值,然后简单地返回该引用。它不是原子地 \xe2\x80\x93 它读取一些值,更新一些值,读取更多,更新更多。当然,我们在上面已经看到这种情况同时发生在多个线程上 \xe2\x80\x93 他们都修改相同的对象。难怪结果是在任何时候(当代码示例调用 时WriteLine),该对象都包含源自不同线程的成员实例值。
字典与此 \xe2\x80\x93 无关,代码只是修改在线程之间非原子共享的对象。这是最常见的并发错误之一。两种最常见的解决方法取决于具体情况。要么使用共享锁使整个对象修改原子化,要么先原子地复制整个对象,然后修改本地副本。
\n对于后者,尝试将其添加到TestData类中:
private Object _copyLock = null;\n \nprivate Object GetLock() {\n \n if (_copyLock != null)\n return _copyLock;\n \n Object newLock = new Object();\n Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null);\n return (prevLock == null) ? newLock : prevLock;\n}\n \npublic TestData Copy() {\n \n lock (GetLock()) {\n TestData copy = new TestData();\n copy.aStr1 = this.aStr1;\n copy.aStr2 = this.aStr2;\n copy.aLong1 = this.aLong1;\n copy.aInt1 = this.aInt1;\n copy.QueryCount = this.QueryCount;\n copy.aDate1 = this.aDate1;\n copy.aDate2 = this.aDate2;\n copy.zData = this.zData;\n \n return copy;\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n然后修改工厂如下:
\nTestData verify = concurrentCache.AddOrUpdate(123, ci,\n (key, existingVal) =>\n {\n TestData newVal = existingVal.Copy();\n newVal.aStr2 = "test1" + newVal.QueryCount;\n newVal.aDate1 = DateTime.MinValue;\n Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count A:" + newVal.QueryCount);\n Interlocked.Increment(ref newVal.QueryCount);\n System.Random RandNum = new System.Random();\n int MyRandomNumber = RandNum.Next(1, 1000);\n \n Thread.Sleep(MyRandomNumber);\n newVal.aInt1++;\n newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1);\n Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count B:" + newVal.QueryCount);\n return newVal;\n });\nRun Code Online (Sandbox Code Playgroud)\n我希望这有帮助。
\n| 归档时间: |
|
| 查看次数: |
1636 次 |
| 最近记录: |