SQL Server数据库调用的多线程C#应用程序

Bar*_*rka 23 c# sql architecture sql-server multithreading

我有一个SQL Server数据库,表中有500,000条记录main.还有其他三个表叫child1,child2child3.很多之间一对多的关系child1,child2,child3,并main通过三个关系表来实现:main_child1_relationship,main_child2_relationship,和main_child3_relationship.我需要读取记录main,更新main,并在关系表中插入新行以及在子表中插入新记录.子表中的记录具有唯一性约束,因此实际计算的伪代码(CalculateDetails)将类似于:

for each record in main
{
   find its child1 like qualities
   for each one of its child1 qualities
   {
      find the record in child1 that matches that quality
      if found
      {
          add a record to main_child1_relationship to connect the two records
      }
      else
      {
          create a new record in child1 for the quality mentioned
          add a record to main_child1_relationship to connect the two records
      }
   }
   ...repeat the above for child2
   ...repeat the above for child3 
}
Run Code Online (Sandbox Code Playgroud)

这可以作为单线程应用程序正常工作.但它太慢了.C#中的处理非常繁重,耗时太长.我想把它变成一个多线程的应用程序.

做这个的最好方式是什么?我们正在使用Linq to Sql.

到目前为止,我的方法是DataContext为每批记录创建一个新对象,main并用ThreadPool.QueueUserWorkItem它来处理它.然而,这些批次踩到彼此的脚趾,因为一个线程添加一个记录,然后下一个线程尝试添加相同的一个...我得到各种有趣的SQL Server死锁.

这是代码:

    int skip = 0;
    List<int> thisBatch;
    Queue<List<int>> allBatches = new Queue<List<int>>();
    do
    {
        thisBatch = allIds
                .Skip(skip)
                .Take(numberOfRecordsToPullFromDBAtATime).ToList();
        allBatches.Enqueue(thisBatch);
        skip += numberOfRecordsToPullFromDBAtATime;

    } while (thisBatch.Count() > 0);

    while (allBatches.Count() > 0)
    {
        RRDataContext rrdc = new RRDataContext();

        var currentBatch = allBatches.Dequeue();
        lock (locker)  
        {
            runningTasks++;
        }
        System.Threading.ThreadPool.QueueUserWorkItem(x =>
                    ProcessBatch(currentBatch, rrdc));

        lock (locker) 
        {
            while (runningTasks > MAX_NUMBER_OF_THREADS)
            {
                 Monitor.Wait(locker);
                 UpdateGUI();
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

这是ProcessBatch:

    private static void ProcessBatch( 
        List<int> currentBatch, RRDataContext rrdc)
    {
        var topRecords = GetTopRecords(rrdc, currentBatch);
        CalculateDetails(rrdc, topRecords);
        rrdc.Dispose();

        lock (locker)
        {
            runningTasks--;
            Monitor.Pulse(locker);
        };
    }
Run Code Online (Sandbox Code Playgroud)

    private static List<Record> GetTopRecords(RecipeRelationshipsDataContext rrdc, 
                                              List<int> thisBatch)
    {
        List<Record> topRecords;

        topRecords = rrdc.Records
                    .Where(x => thisBatch.Contains(x.Id))
                    .OrderBy(x => x.OrderByMe).ToList();
        return topRecords;
    }
Run Code Online (Sandbox Code Playgroud)

CalculateDetails 最好的解释是顶部的伪代码.

我认为必须有更好的方法来做到这一点.请帮忙.非常感谢!

Phi*_*hil 50

这是我对这个问题的看法:

  • 当使用多个线程在SQL Server或任何数据库中插入/更新/查询数据时,死锁是生活中的事实.你必须假设它们会发生并适当地处理它们.

  • 事实并非如此,我们不应该试图限制死锁的发生.但是,很容易阅读死锁的基本原因并采取措施防止它们,但SQL Server总是会让你大吃一惊:-)

死锁的一些原因:

  • 线程太多 - 尝试将线程数限制到最小,但当然我们需要更多线程来获得最大性能.

  • 没有足够的索引.如果选择和更新没有足够的选择性,那么SQL将获取比健康更大的范围锁.尝试指定适当的索引.

  • 索引太多了.更新索引会导致死锁,因此请尝试将索引减少到所需的最小值.

  • 交易隔离级别太高.使用.NET时的默认隔离级别是"Serializable",而使用SQL Server 的默认隔离级别是"Read Committed".降低隔离级别可以帮助很多(当然,如果适当的话).

这就是我可以解决你的问题的方法:

  • 我不会推出自己的线程解决方案,我会使用TaskParallel库.我的主要方法看起来像这样:

    using (var dc = new TestDataContext())
    {
        // Get all the ids of interest.
        // I assume you mark successfully updated rows in some way
        // in the update transaction.
        List<int> ids = dc.TestItems.Where(...).Select(item => item.Id).ToList();
    
        var problematicIds = new List<ErrorType>();
    
        // Either allow the TaskParallel library to select what it considers
        // as the optimum degree of parallelism by omitting the 
        // ParallelOptions parameter, or specify what you want.
        Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8},
                            id => CalculateDetails(id, problematicIds));
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 执行CalculateDetails方法并重试死锁失败

    private static void CalculateDetails(int id, List<ErrorType> problematicIds)
    {
        try
        {
            // Handle deadlocks
            DeadlockRetryHelper.Execute(() => CalculateDetails(id));
        }
        catch (Exception e)
        {
            // Too many deadlock retries (or other exception). 
            // Record so we can diagnose problem or retry later
            problematicIds.Add(new ErrorType(id, e));
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 核心CalculateDetails方法

    private static void CalculateDetails(int id)
    {
        // Creating a new DeviceContext is not expensive.
        // No need to create outside of this method.
        using (var dc = new TestDataContext())
        {
            // TODO: adjust IsolationLevel to minimize deadlocks
            // If you don't need to change the isolation level 
            // then you can remove the TransactionScope altogether
            using (var scope = new TransactionScope(
                TransactionScopeOption.Required,
                new TransactionOptions {IsolationLevel = IsolationLevel.Serializable}))
            {
                TestItem item = dc.TestItems.Single(i => i.Id == id);
    
                // work done here
    
                dc.SubmitChanges();
                scope.Complete();
            }
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 当然我执行死锁重试帮助器

    public static class DeadlockRetryHelper
    {
        private const int MaxRetries = 4;
        private const int SqlDeadlock = 1205;
    
        public static void Execute(Action action, int maxRetries = MaxRetries)
        {
            if (HasAmbientTransaction())
            {
                // Deadlock blows out containing transaction
                // so no point retrying if already in tx.
                action();
            }
    
            int retries = 0;
    
            while (retries < maxRetries)
            {
                try
                {
                    action();
                    return;
                }
                catch (Exception e)
                {
                    if (IsSqlDeadlock(e))
                    {
                        retries++;
                        // Delay subsequent retries - not sure if this helps or not
                        Thread.Sleep(100 * retries);
                    }
                    else
                    {
                        throw;
                    }
                }
            }
    
            action();
        }
    
        private static bool HasAmbientTransaction()
        {
            return Transaction.Current != null;
        }
    
        private static bool IsSqlDeadlock(Exception exception)
        {
            if (exception == null)
            {
                return false;
            }
    
            var sqlException = exception as SqlException;
    
            if (sqlException != null && sqlException.Number == SqlDeadlock)
            {
                return true;
            }
    
            if (exception.InnerException != null)
            {
                return IsSqlDeadlock(exception.InnerException);
            }
    
            return false;
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  • 另一种可能性是使用分区策略

如果您的表可以自然地分成几个不同的数据集,那么您可以使用SQL Server分区表和索引,也可以手动将现有表拆分为多组表.我建议使用SQL Server的分区,因为第二个选项会很混乱.此外,内置分区仅适用于SQL Enterprise Edition.

如果你可以进行分区,你可以选择一个分裂方案来打破你的数据,比如8个不同的集合.现在您可以使用原始的单线程代码,但每个目标有一个单独的分区.现在不会有任何(或至少是最小数量的)死锁.

我希望这是有道理的.


Ben*_*Ben 6

概观

您的问题的根源是L2S DataContext,如Entity Framework的ObjectContext,不是线程安全的.正如在MSDN论坛交流中所解释的那样,从.NET 4.0开始,对.NET ORM解决方案中的异步操作的支持仍然悬而未决; 你必须推出自己的解决方案,正如你所发现的那样,当你的框架采用单线程时,这并不总是很容易.

我将借此机会指出,L2S建立在ADO.NET之上,ADO.NET本身完全支持异步操作 - 就个人而言,我更倾向于直接处理较低层并自己编写SQL,以确保我完全理解网络上发生的事情.

SQL Server解决方案?

话虽如此,我不得不问 - 这必须是C#解决方案吗?如果你可以从一组插入/更新语句中编写解决方案,你可以直接发送SQL,你的线程和性能问题就会消失.*在我看来,你的问题与实际的数据转换无关.制作,但围绕使他们从.NET的表现.如果从等式中删除.NET,则任务变得更简单.毕竟,最好的解决方案通常是你编写最少量代码的解决方案,对吧?;)

即使你的更新/插入逻辑不能以严格的设置 - 关系方式表达,SQL Server确实有一个用于迭代记录和执行逻辑的内置机制 - 虽然它们在许多用例中被公正地诽谤,但是游标可能在事实上适合你的任务.

如果这是一个必须重复发生的任务,那么将其编码为存储过程可以大大受益.

*当然,长时间运行的SQL带来了自己的问题,比如锁定升级和索引使用,你将不得不面对这些问题.

C#解决方案

当然,可能在SQL中执行此操作是不可能的 - 例如,您的代码决策可能依赖于来自其他地方的数据,或者您的项目可能具有严格的"不允许SQL"约定.你提到了一些典型的多线程错误,但是如果没有看到你的代码,我就无法真正对它们有所帮助.

从C#执行此操作显然是可行的,但您需要处理这样一个事实:每次调用都会存在固定数量的延迟.您可以通过使用池化连接,启用多个活动结果集以及使用异步Begin/End方法来执行查询来缓解网络延迟的影响.即使有了所有这些,您仍然必须接受将数据从SQL Server发送到您的应用程序的成本.

保持代码不受限制的最好方法之一是尽可能避免在线程之间共享可变数据.这意味着不跨多个线程共享相同的DataContext.下一个最好的方法是锁定触摸共享数据的关键代码段 - lock围绕所有DataContext访问,从第一次读取到最终写入.这种方法可能完全消除了多线程的好处; 你可能会使你的锁定更精细,但你要警告这是一条痛苦的道路.

更好的办法是让您的运营完全分开.如果你可以在"主要"记录中划分你的逻辑,这是理想的 - 也就是说,只要各个子表之间没有关系,并且只要"main"中的一个记录没有影响另外,您可以跨多个线程拆分操作,如下所示:

private IList<int> GetMainIds()
{
    using (var context = new MyDataContext())
        return context.Main.Select(m => m.Id).ToList();
}

private void FixUpSingleRecord(int mainRecordId)
{
    using (var localContext = new MyDataContext())
    {
        var main = localContext.Main.FirstOrDefault(m => m.Id == mainRecordId);

        if (main == null)
            return;

        foreach (var childOneQuality in main.ChildOneQualities)
        {
            // If child one is not found, create it
            // Create the relationship if needed
        }

        // Repeat for ChildTwo and ChildThree

        localContext.SaveChanges();
    }
}

public void FixUpMain()
{
    var ids = GetMainIds();
    foreach (var id in ids)
    {
        var localId = id; // Avoid closing over an iteration member
        ThreadPool.QueueUserWorkItem(delegate { FixUpSingleRecord(id) });
    }
}
Run Code Online (Sandbox Code Playgroud)

显然,这与您的问题中的伪代码一样是一个玩具示例,但希望它能让您考虑如何确定任务的范围,使得它们之间没有(或最小)共享状态.我认为,这将是正确的C#解决方案的关键.

编辑响应更新和评论

如果您看到数据一致性问题,我建议强制执行事务语义 - 您可以使用System.Transactions.TransactionScope(添加对System.Transactions的引用)来执行此操作.或者,您可以通过访问内部连接并调用BeginTransaction它(或调用任何DataConnection方法)在ADO.NET级别上执行此操作.

你还提到了死锁.您正在与SQL Server死锁作斗争表明实际的SQL查询正在踩到彼此的脚趾.在不知道实际通过网络发送什么的情况下,很难详细说明发生了什么以及如何解决它.可以说SQL死锁是由SQL查询引起的,而不一定是来自C#线程构造 - 你需要检查究竟是通过线路进行的.我的直觉告诉我,如果每个'main'记录真正独立于其他记录,那么就不需要行和表锁,并且Linq to SQL可能是这里的罪魁祸首.

通过将DataContext.Log属性设置为例如Console.Out,您可以在代码中获取L2S发出的原始SQL的转储.虽然我从未亲自使用它,但我知道LINQPad提供了L2S设施,你也可以在那里获得SQL.

SQL Server Management Studio将为您提供剩余的工作 - 使用活动监视器,您可以实时监视锁定升级.使用查询分析器,您可以查看SQL Server将如何执行查询.有了这些,你应该能够很好地理解你的代码在服务器端做什么,反过来又如何解决它.