数据的线程安全缓冲区,用于批量插入受控大小

use*_*648 6 .net c# parallel-processing parallel-extensions task-parallel-library

我有一个生成必须保存到数据库的数据的模拟。

ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    SaveDataToDatabase(cds);

});
Run Code Online (Sandbox Code Playgroud)

模拟会生成大量数据,所以先生成然后保存到数据库(最多1GB的数据)是不切实际的,并且将其一一保存到数据库也是没有意义的(交易太小不实用)。我想将它们作为受控大小的批量插入插入到数据库中(比如一次提交 100)。

但是,我认为我对并行计算的了解并没有那么理论化。我想出了这个(正如你所看到的,这是非常有缺陷的):

DataBuffer buffer = new DataBuffer(...);

ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    buffer.SaveDataToBuffer(cds, i == r - 1);

});

public class DataBuffer
{
    int count = 0;
    int limit = 100

    object _locker = new object();

    ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }

    public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
    {
            lock (_locker)
            {
                if(count >= limit)
                {
                    ConcurrentBag<ComplexDataSet> dequeueRef;
                    if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }

                    _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                    ComplexDataSetsQueue.Enqueue(_lastItemRef);
                    count = 1;
                }
                else
                {
                    // First time
                    if(_lastItemRef == null)
                    {
                        _lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
                        ComplexDataSetsQueue.Enqueue(_lastItemRef);
                        count = 1;
                    }
                    // If buffer isn't full
                    else
                    {
                        _lastItemRef.Add(data);
                        count++;
                    }
                }

                if(isfinalcycle)
                {
                        // Commit everything that hasn't been committed yet
                        ConcurrentBag<ComplexDataSet> dequeueRef;    
                    while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
                    {
                        Commit(dequeueRef);
                    }
                }
            }
    }

    public void Commit(ConcurrentBag<ComplexDataSet> data)
    {
        // Commit data to database..should this be somehow in another thread or something ?
    }
}
Run Code Online (Sandbox Code Playgroud)

如您所见,我使用队列来创建缓冲区,然后手动决定何时提交。但是,我有一种强烈的感觉,这对于我的问题来说并不是很有效的解决方案。首先,我不确定我是否正确锁定。其次,我不确定这是否是完全线程安全的(或根本不安全)。

请您看一下并评论我应该怎么做?或者是否有更好的方法来做到这一点(使用某种生产者 - 消费者技术或其他方法)?

谢谢和最好的祝福,D。

Cor*_*son 4

无需使用锁或昂贵的并发安全数据结构。数据都是独立的,因此引入锁定和共享只会损害性能和可扩展性。

Parallel.For有一个重载,可以让您指定每个线程的数据。您可以在其中存储专用队列和专用数据库连接。

另外:Parallel.For在内部将您的范围划分为更小的块。传递一个很大的范围是非常有效的,所以没有什么可以改变的。

Parallel.For(0, 10000000, () => new ThreadState(),
    (i, loopstate, threadstate) =>
{
    ComplexDataSet data = GenerateData(i);

    threadstate.Add(data);

    return threadstate;
}, threadstate => threadstate.Dispose());

sealed class ThreadState : IDisposable
{
    readonly IDisposable db;
    readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();

    public ThreadState()
    {
        // initialize db with a private MongoDb connection.
    }

    public void Add(ComplexDataSet cds)
    {
        queue.Enqueue(cds);

        if(queue.Count == 100)
        {
            Commit();
        }
    }

    void Commit()
    {
        db.Write(queue);
        queue.Clear();
    }

    public void Dispose()
    {
        try
        {
            if(queue.Count > 0)
            {
                Commit();
            }
        }
        finally
        {
            db.Dispose();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,MongoDb 目前不支持真正的并发插入——它在服务器中持有一些昂贵的锁,因此并行提交不会为您带来太多(如果有的话)速度。他们希望将来解决这个问题,所以有一天你可能会得到免费的加速。

如果您需要限制持有的数据库连接数量,生产者/消费者设置是一个不错的选择。您可以使用BlockingCollection队列来有效地完成此操作,而无需使用任何锁:

// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.

BlockingCollection<ComplexDataSet> commits =
    new BlockingCollection<ComplexDataSet>(1000);

Task consumer = Task.Factory.StartNew(() =>
    {
        // This is the consumer.  It processes the
        // "commits" queue until it signals completion.

        while(!commits.IsCompleted)
        {
            ComplexDataSet cds;

            // Timeout of -1 will wait for an item or IsCompleted == true.

            if(commits.TryTake(out cds, -1))
            {
                // Got at least one item, write it.
                db.Write(cds);

                // Continue dequeuing until the queue is empty, where it will
                // timeout instantly and return false, or until we've dequeued
                // 100 items.

                for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
                {
                    db.Write(cds);
                }

                // Now that we're waiting for more items or have dequeued 100
                // of them, commit.  More can be continue to be added to the
                // queue by other threads while this commit is processing.

                db.Commit();
            }
        }
    }, TaskCreationOptions.LongRunning);

try
{
    // This is the producer.

    Parallel.For(0, 1000000, i =>
        {
            ComplexDataSet data = GenerateData(i);
            commits.Add(data);
        });
}
finally // put in a finally to ensure the task closes down.
{
    commits.CompleteAdding(); // set commits.IsFinished = true.
    consumer.Wait(); // wait for task to finish committing all the items.
}
Run Code Online (Sandbox Code Playgroud)