DataTable的线程安全性

Jon*_*nik 10 c# datatable ado.net multithreading

我曾经读过这个答案ADO.NET DataTable/DataRow Thread Safety,并且无法理解一些东西.特别是我无法理解[2]文章.我需要使用什么样的包装?谁能举个例子?

另外我无法理解作者的意思是谈论级联锁和完全锁定.请举例.

Mar*_*ell 19

DataTable根本没有设计或打算用于并发使用(特别是在涉及任何形式的突变的情况下).在我看来,这里可取的"包装"是:

  • 删除需要同时工作DataTable(当涉及变异时),或:
  • 删除DataTable,而是使用直接支持您需要的数据结构(例如并发集合),或者更简单并且可以简单地同步(独占或读取/写入)的数据结构

基本上:改变问题.


来自评论:

代码如下:

Parallel.ForEach(strings, str=>
{
    DataRow row;
    lock(table){
        row= table.NewRow();
    }
    MyParser.Parse(str, out row);
    lock(table){
        table.Rows.Add(row)
    }
});
Run Code Online (Sandbox Code Playgroud)

我只能希望这out row是一个错字,因为这实际上不会导致它填充通过创建的行NewRow(),但是:如果你绝对必须使用这种方法,你就不能使用NewRow,因为挂起的行有点共享.你最好的选择是:

Parallel.ForEach(strings, str=> {
    object[] values = MyParser.Parse(str);
    lock(table) {
        table.Rows.Add(values);
    }
});
Run Code Online (Sandbox Code Playgroud)

上面的重要变化是lock覆盖整个新行的过程.请注意,在这样使用时,您无法保证订单Parallel.ForEach,因此最终订单不需要完全匹配(如果数据包含时间组件,这不应该是问题).

然而!我仍然认为你正在以错误的方式接近它:因为并行性是相关的,它必须是非平凡的数据.如果你有非平凡的数据,你真的不想在内存中缓冲它.我强烈建议做类似以下的事情,它可以在单个线程上正常工作:

using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
    bcp.DestinationTable = "MyLog";
    bcp.WriteToServer(reader);    
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
    using(var reader = File.OpenText(path))
    {
        string line;
        while((line = reader.ReadLine()) != null)
        {
            yield return new LogRow {
                // TODO: populate the row from line here
            };
        }
    }
}
...
public sealed class LogRow {
    /* define your schema here */
}
Run Code Online (Sandbox Code Playgroud)

好处:

  • 没有缓冲 - 这是一个完全流式传输操作(yield return不会把东西放到列表或类似的东西)
  • 因此,行可以立即开始流式传输,而无需等待整个文件首先进行预处理
  • 没有内存饱和问题
  • 没有线程并发症/开销
  • 你可以保留原始订单(通常不是关键,但很好)
  • 您只受到读取原始文件的速度的限制,这在单个线程上通常比从多个线程更快(单个IO设备上的争用只是开销)
  • 避免所有开销DataTable,这在这里是过度的 - 因为它是如此灵活,它有很大的开销
  • read(从日志文件中)和write(到数据库)现在是并发的而不是顺序的

我在自己的工作中做了很多像^^^这样的事情,从经验来看,它通常至少DataTable首先在内存中填充的速度快两倍.


最后 - 这是一个IEnumerable<T>接受并发读取器和编写器的实现的示例,而不需要在内存中缓存所有内容 - 这将允许多个线程通过API 使用单个线程解析数据(调用Add并最终Close):SqlBulkCopyIEnumerable<T>

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
    private readonly Queue<T> queue = new Queue<T>();

    public void Add(T value)
    {
        lock (queue)
        {
            if (closed) // no more data once closed
                throw new InvalidOperationException("The bucket has been marked as closed");

            queue.Enqueue(value);
            if (queue.Count == 1)
            { // someone may be waiting for data
                Monitor.PulseAll(queue);
            }
        }
    }

    public void Close()
    {
        lock (queue)
        {
            closed = true;
            Monitor.PulseAll(queue);
        }
    }
    private bool closed;

    public IEnumerator<T> GetEnumerator()
    {
        while (true)
        {
            T value;
            lock (queue)
            {
                if (queue.Count == 0)
                {
                    // no data; should we expect any?
                    if (closed) yield break; // nothing more ever coming

                    // else wait to be woken, and redo from start
                    Monitor.Wait(queue);
                    continue;
                }
                value = queue.Dequeue();
            }
            // yield it **outside** of the lock
            yield return value;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

static class Program
{
    static void Main()
    {
        var bucket = new ThreadSafeBucket<int>();
        int expectedTotal = 0;
        ThreadPool.QueueUserWorkItem(delegate
        {
            int count = 0, sum = 0;
            foreach(var item in bucket)
            {
                count++;
                sum += item;
                if ((count % 100) == 0)
                    Console.WriteLine("After {0}: {1}", count, sum);
            }
            Console.WriteLine("Total over {0}: {1}", count, sum);
        });
        Parallel.For(0, 5000,
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            i => {
                bucket.Add(i);
                Interlocked.Add(ref expectedTotal, i);
            }
        );
        Console.WriteLine("all data added; closing bucket");
        bucket.Close();
        Thread.Sleep(100);
        Console.WriteLine("expecting total: {0}",
            Interlocked.CompareExchange(ref expectedTotal, 0, 0));
        Console.ReadLine();


    }

}
Run Code Online (Sandbox Code Playgroud)

  • @basketballfan22“操作系统预取” (3认同)
  • @ basketballfan22它们是单独的设备;如果您先进行所有读取,再进行所有写入,那么就不会有任何重叠-但是:如果您读取*一些*数据,然后开始写入...则当您在磁盘IO上等待时,网络IO正在静默缓冲,这意味着当您返回以获取更多数据时,很有可能已经存在该数据;同样,对于磁盘写缓存,当您用尽DB / NIC输入缓冲区并“实际上”等待更多时间时,磁盘将刷新自身。一切都很好-甚至在*之前*,我们也开始添加`async`之类的细节。 (2认同)