有没有办法将任务并行库(TPL)与SQLDataReader一起使用?

Rod*_*ley 20 c# ado.net .net-4.0 task-parallel-library

我喜欢TPL中Parallel.For和Parallel.ForEach扩展方法的简单性.我想知道是否有办法利用类似的东西,甚至是稍微高级的任务.

下面是SqlDataReader的典型用法,我想知道是否可能,如果是这样,如何用TPL中的东西替换下面的while循环.因为读者不能提供固定数量的迭代,所以不能使用For扩展方法,这样就可以处理我将收集的任务.我希望有人可能已经解决了这个问题,然后找出了一些与ADO.net不同的事情.

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Ree*_*sey 25

你将很难直接替换while循环. SqlDataReader不是线程安全类,因此您无法直接从多个线程使用它.

话虽这么说,您可以使用TPL 处理您读取的数据.这里有几个选项.最简单的方法是创建IEnumerable<T>适用于阅读器的实现,并返回包含数据的类或结构.然后,您可以使用PLINQ或Parallel.ForEach语句并行处理数据:

public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

拥有该方法后,您可以通过PLINQ或TPL直接处理:

Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});
Run Code Online (Sandbox Code Playgroud)

要么:

this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});
Run Code Online (Sandbox Code Playgroud)


Joe*_*orn 19

你快到了.使用此签名包裹您在函数中发布的代码:

IEnumerable<IDataRecord> MyQuery()
Run Code Online (Sandbox Code Playgroud)

然后用以下// Do something with Reader代码替换你的代码:

yield return reader;
Run Code Online (Sandbox Code Playgroud)

现在你有一些在单个线程中工作的东西.不幸的是,当您阅读查询结果时,它每次都返回对同一对象的引用,并且对象只是为每次迭代而改变自身.这意味着如果你尝试并行运行它会得到一些非常奇怪的结果,因为并行读取会改变不同线程中使用的对象.您需要代码来获取记录的副本以发送到并行循环.

但是,在这一点上,我喜欢做的是跳过记录的额外副本并直接进入强类型类.更重要的是,我喜欢使用通用方法来做到这一点:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

假设您的工厂方法按预期创建副本,则此代码应该可以安全地在Parallel.ForEach循环中使用.调用该方法看起来像这样(假设一个Employee类具有名为"Create"的静态工厂方法):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());
Run Code Online (Sandbox Code Playgroud)

重要更新:
我对此代码的信心并不像以前那么自信.一个单独的线程仍然可以改变读者,而另一个线程正在进行复制.我可以锁定它,但我也担心另一个线程可以调用更新读取器后原始自己调用Read()但在它开始复制之前.因此,这里的关键部分包含整个while循环......此时,您又回到了单线程.我希望有一种方法可以修改此代码,以便在多线程方案中按预期工作,但需要更多学习.

  • 这里有一些不错的功能代码,但是关于并行运行它的好处,我不确定有没有.瓶颈可能是实际的db调用,它不是并行运行的. (4认同)