Jon*_*nik 10 c# datatable ado.net multithreading
我曾经读过这个答案ADO.NET DataTable/DataRow Thread Safety,并且无法理解一些东西.特别是我无法理解[2]文章.我需要使用什么样的包装?谁能举个例子?
另外我无法理解作者的意思是谈论级联锁和完全锁定.请举例.
Mar*_*ell 19
DataTable根本没有设计或打算用于并发使用(特别是在涉及任何形式的突变的情况下).在我看来,这里可取的"包装"是:
DataTable(当涉及变异时),或:DataTable,而是使用直接支持您需要的数据结构(例如并发集合),或者更简单并且可以简单地同步(独占或读取/写入)的数据结构基本上:改变问题.
来自评论:
代码如下:
Run Code Online (Sandbox Code Playgroud)Parallel.ForEach(strings, str=> { DataRow row; lock(table){ row= table.NewRow(); } MyParser.Parse(str, out row); lock(table){ table.Rows.Add(row) } });
我只能希望这out row是一个错字,因为这实际上不会导致它填充通过创建的行NewRow(),但是:如果你绝对必须使用这种方法,你就不能使用NewRow,因为挂起的行有点共享.你最好的选择是:
Parallel.ForEach(strings, str=> {
object[] values = MyParser.Parse(str);
lock(table) {
table.Rows.Add(values);
}
});
Run Code Online (Sandbox Code Playgroud)
上面的重要变化是lock覆盖整个新行的过程.请注意,在这样使用时,您无法保证订单Parallel.ForEach,因此最终订单不需要完全匹配(如果数据包含时间组件,这不应该是问题).
然而!我仍然认为你正在以错误的方式接近它:因为并行性是相关的,它必须是非平凡的数据.如果你有非平凡的数据,你真的不想在内存中缓冲它.我强烈建议做类似以下的事情,它可以在单个线程上正常工作:
using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
bcp.DestinationTable = "MyLog";
bcp.WriteToServer(reader);
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
using(var reader = File.OpenText(path))
{
string line;
while((line = reader.ReadLine()) != null)
{
yield return new LogRow {
// TODO: populate the row from line here
};
}
}
}
...
public sealed class LogRow {
/* define your schema here */
}
Run Code Online (Sandbox Code Playgroud)
好处:
yield return不会把东西放到列表或类似的东西)DataTable,这在这里是过度的 - 因为它是如此灵活,它有很大的开销我在自己的工作中做了很多像^^^这样的事情,从经验来看,它通常至少比DataTable首先在内存中填充的速度快两倍.
最后 - 这是一个IEnumerable<T>接受并发读取器和编写器的实现的示例,而不需要在内存中缓存所有内容 - 这将允许多个线程通过API 使用单个线程解析数据(调用Add并最终Close):SqlBulkCopyIEnumerable<T>
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
private readonly Queue<T> queue = new Queue<T>();
public void Add(T value)
{
lock (queue)
{
if (closed) // no more data once closed
throw new InvalidOperationException("The bucket has been marked as closed");
queue.Enqueue(value);
if (queue.Count == 1)
{ // someone may be waiting for data
Monitor.PulseAll(queue);
}
}
}
public void Close()
{
lock (queue)
{
closed = true;
Monitor.PulseAll(queue);
}
}
private bool closed;
public IEnumerator<T> GetEnumerator()
{
while (true)
{
T value;
lock (queue)
{
if (queue.Count == 0)
{
// no data; should we expect any?
if (closed) yield break; // nothing more ever coming
// else wait to be woken, and redo from start
Monitor.Wait(queue);
continue;
}
value = queue.Dequeue();
}
// yield it **outside** of the lock
yield return value;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
static class Program
{
static void Main()
{
var bucket = new ThreadSafeBucket<int>();
int expectedTotal = 0;
ThreadPool.QueueUserWorkItem(delegate
{
int count = 0, sum = 0;
foreach(var item in bucket)
{
count++;
sum += item;
if ((count % 100) == 0)
Console.WriteLine("After {0}: {1}", count, sum);
}
Console.WriteLine("Total over {0}: {1}", count, sum);
});
Parallel.For(0, 5000,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
i => {
bucket.Add(i);
Interlocked.Add(ref expectedTotal, i);
}
);
Console.WriteLine("all data added; closing bucket");
bucket.Close();
Thread.Sleep(100);
Console.WriteLine("expecting total: {0}",
Interlocked.CompareExchange(ref expectedTotal, 0, 0));
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
19792 次 |
| 最近记录: |