使用"ConcurrentDictionary.GetOrAdd()"时,避免陈旧(逻辑损坏)数据,包括Repro代码

goo*_*ate 5 c# concurrency multithreading thread-safety task-parallel-library

本文的底部描述了如何使用GetOrAdd(如果我理解正确)可能导致损坏/意外结果.

喀嚓/

ConcurrentDictionary专为多线程场景而设计.您不必在代码中使用锁来添加或删除集合中的项.但是,一个线程始终可以检索一个值,而另一个线程可以通过为相同的键提供一个新值来立即更新该集合.

此外,尽管ConcurrentDictionary的所有方法都是线程安全的,但并非所有方法都是原子方法,特别是GetOrAdd和AddOrUpdate.传递给这些方法的用户委托是在字典的内部锁之外调用的.(这样做是为了防止未知代码阻塞所有线程.)因此,可能会发生以下事件序列:

1)threadA调用GetOrAdd,找不到任何项,并通过调用valueFactory委托创建一个新项添加到Add.

2)threadB同时调用GetOrAdd,调用其valueFactory委托,并在threadA之前到达内部锁,因此将其新的键值对添加到字典中.

3)threadA的用户委托完成,线程到达锁,但现在看到该项已经存在

4)threadA执行"Get",并返回先前由threadB添加的数据.

因此,无法保证GetOrAdd返回的数据与线程的valueFactory创建的数据相同.调用AddOrUpdate时,可能会发生类似的事件序列.

验证数据的正确方法是什么,并重试更新?一个很好的方法是根据旧值的内容尝试/重试此操作的扩展方法.

这将如何实施?我可以依赖result(verify)作为有效结束状态,还是必须使用不同的方法重试并重新检索值?

更新值时,以下代码具有竞争条件.期望的行为是AddOrUpdateWithoutRetrieving()将以不同的方式递增各种值(使用++Interlocked.Increment()).

我还想在一个单元中执行多个字段操作,如果先前的更新由于竞争条件而没有"占用",则重试更新.

运行代码,您将看到控制台中出现的每个值开始增加1,但每个值都会漂移,有些值会前后几次迭代.

namespace DictionaryHowTo
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    // The type of the Value to store in the dictionary:
    class FilterConcurrentDuplicate
    {
        // Create a new concurrent dictionary.
        readonly ConcurrentDictionary<int, TestData> eventLogCache = 
             new ConcurrentDictionary<int, TestData>();

        static void Main()
        {
            FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();

            c.DoRace(null);
        }

        readonly ConcurrentDictionary<int, TestData> concurrentCache = 
            new ConcurrentDictionary<int, TestData>();
        void DoRace(string[] args)
        {
            int max = 1000;

            // Add some key/value pairs from multiple threads.
            Task[] tasks = new Task[3];

            tasks[0] = Task.Factory.StartNew(() =>
            {

                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 500);

                Thread.Sleep(MyRandomNumber);
                AddOrUpdateWithoutRetrieving();

            });

            tasks[1] = Task.Factory.StartNew(() =>
            {
                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 1000);

                Thread.Sleep(MyRandomNumber);

                AddOrUpdateWithoutRetrieving();

            });

            tasks[2] = Task.Factory.StartNew(() =>
            {
                AddOrUpdateWithoutRetrieving();

            });
            // Output results so far.
            Task.WaitAll(tasks);

            AddOrUpdateWithoutRetrieving();

            Console.WriteLine("Press any key.");
            Console.ReadKey();
        }
        public class TestData : IEqualityComparer<TestData>
        {
            public string aStr1 { get; set; }
            public Guid? aGud1 { get; set; }
            public string aStr2 { get; set; }
            public int aInt1 { get; set; }
            public long? aLong1 { get; set; }

            public DateTime aDate1 { get; set; }
            public DateTime? aDate2 { get; set; }

            //public int QueryCount { get; set; }
            public int QueryCount = 0;//

            public string zData { get; set; }
            public bool Equals(TestData x, TestData y)
            {
                return x.aStr1 == y.aStr1 &&
                    x.aStr2 == y.aStr2 &&
                       x.aGud1 == y.aGud1 &&
                       x.aStr2 == y.aStr2 &&
                       x.aInt1 == y.aInt1 &&
                       x.aLong1 == y.aLong1 &&
                       x.aDate1 == y.aDate1 &&
                       x.QueryCount == y.QueryCount ;
            }

            public int GetHashCode(TestData obj)
            {
                TestData ci = (TestData)obj;
                // http://stackoverflow.com/a/263416/328397
                return 
                  new { 
                         A = ci.aStr1, 
                         Aa = ci.aStr2, 
                         B = ci.aGud1, 
                         C = ci.aStr2, 
                         D = ci.aInt1, 
                         E = ci.aLong1, 
                         F = ci.QueryCount , 
                         G = ci.aDate1}.GetHashCode();
            }
        }
        private   void AddOrUpdateWithoutRetrieving()
        {
            // Sometime later. We receive new data from some source.
            TestData ci = new TestData() 
            { 
              aStr1 = "Austin", 
              aGud1 = new Guid(), 
              aStr2 = "System", 
              aLong1 = 100, 
              aInt1 = 1000, 
              QueryCount = 0, 
              aDate1 = DateTime.MinValue
            };

            TestData verify = concurrentCache.AddOrUpdate(123, ci,
                (key, existingVal) =>
                {
                    existingVal.aStr2 = "test1" + existingVal.QueryCount;
                    existingVal.aDate1 = DateTime.MinValue;
                    Console.WriteLine
                     ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
                          "  Query Count A:" + existingVal.QueryCount);
                    Interlocked.Increment(ref existingVal.QueryCount);
                    System.Random RandNum = new System.Random();
                    int MyRandomNumber = RandNum.Next(1, 1000);

                    Thread.Sleep(MyRandomNumber);
                    existingVal.aInt1++;
                    existingVal.aDate1 = 
                         existingVal.aDate1.AddSeconds
                         (existingVal.aInt1);  
                    Console.WriteLine(
                          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
                           "  Query Count B:" + existingVal.QueryCount);
                    return existingVal;
                });


            // After each run, every value here should be ++ the previous value
            Console.WriteLine(
                "Thread:"+Thread.CurrentThread.ManagedThreadId + 
                 ": Query Count returned:" + verify.QueryCount + 
                 " eid:" + verify.aInt1 + " date:" +  
                 verify.aDate1.Hour + " "  + verify.aDate1.Second + 
                 " NAME:" + verify.aStr2
                );
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

产量

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System

Thread:12  Query Count A:0
Thread:13  Query Count A:1
Thread:12  Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11

Thread:12  Query Count A:2
Thread:13  Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12

Thread:13  Query Count A:3
Thread:11  Query Count A:4
Thread:11  Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14

Thread:11  Query Count A:5
Thread:13  Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15
Run Code Online (Sandbox Code Playgroud)

....

Thread:11  Query Count A:658
Thread:11  Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658

Thread:11  Query Count A:659
Thread:11  Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659

Thread:11  Query Count A:660
Thread:11  Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660

Thread:11  Query Count A:661
Thread:11  Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661
Run Code Online (Sandbox Code Playgroud)

在此代码中,"eid"应始终比查询计数多1,000,但在迭代中,两者之间的差异从1到7不等.这种不一致可能会导致某些应用程序失败或报告错误的数据.

小智 5

此提交基于对文章底部备注的错误理解 \xe2\x80\x9c如何:从 ConcurrentDictionary 添加和删除项目\xe2\x80\x9d http://msdn.microsoft.com/en -us/library/dd997369.aspx以及关于共享对象的基本并发错误 \xe2\x80\x93 并发非原子修改。

\n

首先,让\xe2\x80\x99s 澄清链接文章的真正含义。I\xe2\x80\x99ll 使用AddOrUpdate作为示例,但推理GetOrAdd是等效的。

\n

假设您AddOrUpdate从多个线程调用并指定相同的密钥。假设具有该键的条目已经存在。\n每个线程都会出现,注意已经存在具有指定键的条目,并且更新部分AddOrUpdate是相关的。\n这样做时,没有线程会锁定字典。相反,它将使用一些互锁指令来自动检查输入键是否存在。

\n

因此,我们的几个线程都注意到密钥存在并且需要updateValueFactory调用。该代表被传递给AddOrUpdate; 它引用现有的键和值并返回更新值。现在,所有涉及的线程将同时调用工厂。然后它们都将以某种先前未知的顺序完成,并且每个线程将尝试使用原子操作(使用互锁指令)将现有值替换为刚刚计算的值。无法知道哪个线程将\xe2\x80\x9cwin\xe2\x80\x9d。获胜的线程将存储其计算值。其他人会注意到字典中的值不再是updateValueFactory作为参数传递给他们的值。为了响应这种认识,他们将放弃操作并丢弃刚刚计算的值。这正是您想要发生的事情。

\n

接下来,让我们澄清为什么在运行此处列出的代码示例时会得到奇怪的值:

\n

回想一下,updateValueFactory传递给的委托AddOrUpdate采用 REFERENCES 现有键和值并返回更新值。\n其AddOrUpdateWithoutRetrieving()方法中的代码示例开始直接对该引用执行操作。它不是创建新的替换值并修改它,而是修改existingVal字典 \xe2\x80\x93 中已有的对象 \xe2\x80\x93 的实例成员值,然后简单地返回该引用。它不是原子地 \xe2\x80\x93 它读取一些值,更新一些值,读取更多,更新更多。当然,我们在上面已经看到这种情况同时发生在多个线程上 \xe2\x80\x93 他们都修改相同的对象。难怪结果是在任何时候(当代码示例调用 时WriteLine),该对象都包含源自不同线程的成员实例值。

\n

字典与此 \xe2\x80\x93 无关,代码只是修改在线程之间非原子共享的对象。这是最常见的并发错误之一。两种最常见的解决方法取决于具体情况。要么使用共享锁使整个对象修改原子化,要么先原子地复制整个对象,然后修改本地副本。

\n

对于后者,尝试将其添加到TestData类中:

\n
private Object _copyLock = null;\n \nprivate Object GetLock() {\n \n    if (_copyLock != null)\n        return _copyLock;\n \n    Object newLock = new Object();\n    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null);\n    return (prevLock == null) ? newLock : prevLock;\n}\n \npublic TestData Copy() {\n \n    lock (GetLock()) {\n        TestData copy = new TestData();\n        copy.aStr1 = this.aStr1;\n        copy.aStr2 = this.aStr2;\n        copy.aLong1 = this.aLong1;\n        copy.aInt1 = this.aInt1;\n        copy.QueryCount = this.QueryCount;\n        copy.aDate1 = this.aDate1;\n        copy.aDate2 = this.aDate2;\n        copy.zData = this.zData;\n \n        return copy;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

然后修改工厂如下:

\n
TestData verify = concurrentCache.AddOrUpdate(123, ci,\n    (key, existingVal) =>\n    {\n        TestData newVal = existingVal.Copy();\n        newVal.aStr2 = "test1" + newVal.QueryCount;\n        newVal.aDate1 = DateTime.MinValue;\n        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count A:" + newVal.QueryCount);\n        Interlocked.Increment(ref newVal.QueryCount);\n        System.Random RandNum = new System.Random();\n        int MyRandomNumber = RandNum.Next(1, 1000);\n \n        Thread.Sleep(MyRandomNumber);\n        newVal.aInt1++;\n        newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1);\n        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count B:" + newVal.QueryCount);\n        return newVal;\n    });\n
Run Code Online (Sandbox Code Playgroud)\n

我希望这有帮助。

\n