使用 Task.WhenAll 跨多个线程的 TransactionScope

Ran*_*nan 5 c# multithreading transactionscope task-parallel-library async-await

我正在尝试使用 Task.WhenAll 对数据库进行多个并行更新。代码流程是这样的。

在主方法中,我创建了一个事务范围并创建了主事务的克隆并传递给子级。主交易被阻止,直到子交易完成

using (var scope = DalcHelper.GetTransactionScope())
{
    DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task1= Dalc.UpdateDetails1(transaction );

    DependentTransaction transaction1 = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task2 = Dalc.UpdateDetails2(transaction1);

    await Task.WhenAll(task1, task2 ).ConfigureAwait(false);

    scope.Complete();
}
Run Code Online (Sandbox Code Playgroud)

DalcMethod 是这样的。这里从外部事务创建的克隆作为参数。从属事务完成通知主事务从属事务已完成

try
{
    using (SqlCommand databaseCommand = DalcHelper.GetCommand(SPName))
    using (var scope = new TransactionScope(dependentCloneTransaction, TransactionScopeAsyncFlowOption.Enabled))
    {
        -- Update database
        scope.Complete();
    }
}
finally
{
    //Call complete on the dependent transaction
    dependentCloneTransaction.Complete();
}
Run Code Online (Sandbox Code Playgroud)

Dalc 方法是返回 Task 的异步方法

我收到以下异常

事务已中止。尝试提升事务时失败。已经有一个与此命令关联的打开的 DataReader,必须先将其关闭。等待操作超时

。谁能告诉我我在这里做错了什么?

小智 0

namespace Playground
{
    static class DalcHelper
    {
        public static TransactionScope GetTransactionScope()
        {
            return new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        }

        public async static Task ReadDetails1(DependentTransaction transaction,SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons"; // some table, say Persons
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int Id = reader.GetInt32("Id");
                        Console.WriteLine("Id " + Id);
                    }
                    reader.Close();
                }
                transaction.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 1"+ ex.Message);
            }
        }

        public async static Task ReadDetails2(DependentTransaction transaction1, SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons";
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int age = reader.GetInt32("Age");
                        Console.WriteLine("Age " + age);
                    }
                    reader.Close();
                }
                transaction1.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 2" + ex.Message);
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            string connectionString = "YourConnectionString";
            _ = RunMe(connectionString);
        }

        private async static Task RunMe(string connectionString)
        {
            
                try
                {
                    
                    Task task1 = Task.Run( async()=> {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails1(transaction, conn);
                                /*
                                * add more tasks if you wish to
                                */
                                Console.WriteLine("Completed task 1");
                                conn.Close();

                            }
                            scope.Complete();
                        }
                    });

                    

                    Task task2 = Task.Run(async () =>
                    {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails2(transaction, conn);
                                /*
                                    may be update some column of table based on previous op.
                                   // await DalcHelper.UpdateDetails2(transaction, conn); 
                                */ 
                                Console.WriteLine("Completed task 2");
                                conn.Close();
                            }
                            /*
                            calling `Complete` method will commit all the changes within the transaction scope(including the UpdateDetails2 method)
                            need not dispose transaction scope explicitly, `using` block takes care of that
                            */ 
                            scope.Complete(); 
                        }
                    });

                 await Task.WhenAll(task1, task2);// at this point every task added is complete
                 Console.WriteLine("completed both tasks");
                 Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

使用事务范围时要记住的一些要点

  1. 需要TransactionScope在创建它的同一线程中进行处理,否则可能会抛出类似的错误Transaction already aborted
  2. TransactionScope.Complete()仅当调用方法时,才会保留任何更新操作。
  3. 确保为每个线程打开单独的连接,并在使用后关闭它。话虽如此,从性能的角度来看,我不确定是否为每个线程使用单独的连接。我很高兴在这方面接受更多教育,我会更新我的答案。但是,此解决方案应该可以帮助您解决问题。

请阅读一些已经发布的与该主题相关的有用答案