使用Parallel.ForEach的Azure TableQuery线程安全性

Pau*_*ert 8 c# azure parallel-extensions azure-table-storage

我有一些基本的Azure表,我一直在串行查询:

var query = new TableQuery<DynamicTableEntity>()
  .Where(TableQuery.GenerateFilterCondition("PartitionKey",
    QueryComparisons.Equal, myPartitionKey));

foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) {
  // Process entity here.
}
Run Code Online (Sandbox Code Playgroud)

为了加快速度,我将其并行化:

Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => {
  // Process entity here in a thread-safe manner.

  // Edited to add: Details of the loop body below:

  // This is the essence of the fixed loop body:
  lock (myLock) {
    DataRow myRow = myDataTable.NewRow();
    // [Add entity data to myRow.]
    myDataTable.Rows.Add(myRow);
  }

  // Old code (apparently not thread-safe, though NewRow() is supposed to create
  // a DataRow based on the table's schema without changing the table state):
  /*
    DataRow myRow = myDataTable.NewRow();
    lock (myLock) {
      // [Add entity data to myRow.]
      myDataTable.Rows.Add(myRow);
    }
  */
});
Run Code Online (Sandbox Code Playgroud)

这会产生显着的加速,但结果往往在运行之间略有不同(即,一些实体偶尔会有所不同,尽管返回的实体数量完全相同).

从这个和一些网络搜索,我得出结论,上面的枚举器并不总是线程安全的.该文档似乎表明,只有当表对象是公共静态时,才能保证线程安全,但这对我没有任何影响.

有人可以建议如何解决这个问题吗?是否存在用于并行化Azure表查询的标准模式?

小智 4

您的评论是正确的:DataTable 不适合涉及突变的并发操作,并且是重复条目的来源。锁定 DataTable 对象以进行行修改操作将解决该问题:

 lock (myTable)
 {
    DataRow myRow = myTable.NewRow();
    myRow.SetField<int>("c1", (int)value);
    myTable.Rows.Add(myRow);
 }
Run Code Online (Sandbox Code Playgroud)

将 NewRow() 放在锁之外将间歇性地导致表中出现重复的行条目,或者 NewRow() 行上出现“System.Data.dll 中发生未处理的“System.ArgumentException”类型异常”异常。有关并发 DataTable 使用的其他详细信息和替代方案,请参阅DataTable 的线程安全性

要重现错误情况,请使用此代码。有些运行是干净的,有些会包含重复的条目,有些会遇到异常。

   class Program
   {
      static DataTable myTable = GetTable();
      static ManualResetEvent waitHandle = new ManualResetEvent(false);

      static void Main(string[] args)
      {
         const int threadCount = 10;
         List<Thread> threads = new List<System.Threading.Thread>();
         for (int i = 0; i < threadCount; ++i) 
         {
            threads.Add(new Thread(new ParameterizedThreadStart(AddRowThread)));
            threads[i].Start(i);
         }
         waitHandle.Set(); // Release all the threads at once
         for (int i = 0; i < threadCount; ++i) 
         {
            threads[i].Join();
         }

         // Print results once threads return
         for (int i = 0; i < myTable.Rows.Count; ++i)
         {
            Console.WriteLine(myTable.Rows[i].Field<int>(0));
         }
         Console.WriteLine("---Processing Complete---");
         Console.ReadKey();
      }

      static void AddRowThread(object value)
      {
         waitHandle.WaitOne();
         DataRow myRow = myTable.NewRow(); // THIS RESULTS IN INTERMITTENT ERRORS
         lock (myTable)
         {
            //DataRow myRow = myTable.NewRow(); // MOVE NewRow() CALL HERE TO RESOLVE ISSUE
            myRow.SetField<int>("c1", (int)value);
            myTable.Rows.Add(myRow);
         }
      }

      static DataTable GetTable()
      {
         // Here we create a DataTable with four columns.
         DataTable table = new DataTable();
         table.Columns.Add("c1", typeof(int));       
         return table;
      }
   }
Run Code Online (Sandbox Code Playgroud)