use*_*648 6 .net c# parallel-processing parallel-extensions task-parallel-library
我有一个生成必须保存到数据库的数据的模拟。
ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
SaveDataToDatabase(cds);
});
Run Code Online (Sandbox Code Playgroud)
模拟会生成大量数据,所以先生成然后保存到数据库(最多1GB的数据)是不切实际的,并且将其一一保存到数据库也是没有意义的(交易太小不实用)。我想将它们作为受控大小的批量插入插入到数据库中(比如一次提交 100)。
但是,我认为我对并行计算的了解并没有那么理论化。我想出了这个(正如你所看到的,这是非常有缺陷的):
DataBuffer buffer = new DataBuffer(...);
ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
buffer.SaveDataToBuffer(cds, i == r - 1);
});
public class DataBuffer
{
int count = 0;
int limit = 100
object _locker = new object();
ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }
public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
{
lock (_locker)
{
if(count >= limit)
{
ConcurrentBag<ComplexDataSet> dequeueRef;
if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
else
{
// First time
if(_lastItemRef == null)
{
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
// If buffer isn't full
else
{
_lastItemRef.Add(data);
count++;
}
}
if(isfinalcycle)
{
// Commit everything that hasn't been committed yet
ConcurrentBag<ComplexDataSet> dequeueRef;
while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
}
}
}
public void Commit(ConcurrentBag<ComplexDataSet> data)
{
// Commit data to database..should this be somehow in another thread or something ?
}
}
Run Code Online (Sandbox Code Playgroud)
如您所见,我使用队列来创建缓冲区,然后手动决定何时提交。但是,我有一种强烈的感觉,这对于我的问题来说并不是很有效的解决方案。首先,我不确定我是否正确锁定。其次,我不确定这是否是完全线程安全的(或根本不安全)。
请您看一下并评论我应该怎么做?或者是否有更好的方法来做到这一点(使用某种生产者 - 消费者技术或其他方法)?
谢谢和最好的祝福,D。
无需使用锁或昂贵的并发安全数据结构。数据都是独立的,因此引入锁定和共享只会损害性能和可扩展性。
Parallel.For有一个重载,可以让您指定每个线程的数据。您可以在其中存储专用队列和专用数据库连接。
另外:Parallel.For在内部将您的范围划分为更小的块。传递一个很大的范围是非常有效的,所以没有什么可以改变的。
Parallel.For(0, 10000000, () => new ThreadState(),
(i, loopstate, threadstate) =>
{
ComplexDataSet data = GenerateData(i);
threadstate.Add(data);
return threadstate;
}, threadstate => threadstate.Dispose());
sealed class ThreadState : IDisposable
{
readonly IDisposable db;
readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();
public ThreadState()
{
// initialize db with a private MongoDb connection.
}
public void Add(ComplexDataSet cds)
{
queue.Enqueue(cds);
if(queue.Count == 100)
{
Commit();
}
}
void Commit()
{
db.Write(queue);
queue.Clear();
}
public void Dispose()
{
try
{
if(queue.Count > 0)
{
Commit();
}
}
finally
{
db.Dispose();
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在,MongoDb 目前不支持真正的并发插入——它在服务器中持有一些昂贵的锁,因此并行提交不会为您带来太多(如果有的话)速度。他们希望将来解决这个问题,所以有一天你可能会得到免费的加速。
如果您需要限制持有的数据库连接数量,生产者/消费者设置是一个不错的选择。您可以使用BlockingCollection队列来有效地完成此操作,而无需使用任何锁:
// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.
BlockingCollection<ComplexDataSet> commits =
new BlockingCollection<ComplexDataSet>(1000);
Task consumer = Task.Factory.StartNew(() =>
{
// This is the consumer. It processes the
// "commits" queue until it signals completion.
while(!commits.IsCompleted)
{
ComplexDataSet cds;
// Timeout of -1 will wait for an item or IsCompleted == true.
if(commits.TryTake(out cds, -1))
{
// Got at least one item, write it.
db.Write(cds);
// Continue dequeuing until the queue is empty, where it will
// timeout instantly and return false, or until we've dequeued
// 100 items.
for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
{
db.Write(cds);
}
// Now that we're waiting for more items or have dequeued 100
// of them, commit. More can be continue to be added to the
// queue by other threads while this commit is processing.
db.Commit();
}
}
}, TaskCreationOptions.LongRunning);
try
{
// This is the producer.
Parallel.For(0, 1000000, i =>
{
ComplexDataSet data = GenerateData(i);
commits.Add(data);
});
}
finally // put in a finally to ensure the task closes down.
{
commits.CompleteAdding(); // set commits.IsFinished = true.
consumer.Wait(); // wait for task to finish committing all the items.
}
Run Code Online (Sandbox Code Playgroud)