用于处理大数据量(100 万条记录及更多)的数据结构和技术

Ser*_*gey 4 .net c# large-data data-structures

我一直在开发一个 WPF .NET 4.5 应用程序,最初适用于小数据量,现在适用于 100 万甚至更多的更大数据量,当然我开始耗尽内存。数据来自 MS SQL DB,数据处理需要加载到本地数据结构,因为这些数据随后由 CLR 中的代码转换/处理/引用,需要连续且不间断的数据访问,但并非所有数据都具有立即加载到内存中,但仅在实际访问时才加载。作为一个小例子,反距离插值器使用此数据来生成插值地图,并且所有数据都需要传递给它以进行连续网格生成。

我重写了应用程序的某些部分来处理数据,例如在任何给定时间仅加载 x 行数,并实现有效的数据处理滑动窗口方法。然而,为应用程序的其余部分执行此操作将需要一些时间投入,我想知道是否可以有一种更强大和标准的方法来解决这个设计问题(必须有,我不是第一个)?

太棒了;C# 是否提供任何数据结构或技术来以中断方式访问大量数据,因此它的行为类似于 IEnumerable,但数据在实际访问或需要之前并不在内存中,还是完全由我来管理内存使用?我的理想是一种结构,它可以自动实现类似缓冲区的机制,并在访问数据时加载更多数据,并从已访问且不再感兴趣的数据中释放内存。也许像一些带有内部缓冲区的数据表?

Jim*_*hel 5

至于迭代太大而无法放入内存的非常大的数据集,您可以使用生产者-消费者模型。当我处理包含数十亿条记录(数据总量约为 2 TB)的自定义数据集时,我使用了类似的方法。

这个想法是有一个包含生产者和消费者的单一类。当您创建该类的新实例时,它会启动一个生产者线程来填充受约束的并发队列。并且该线程使队列保持满。消费者部分是 API,可让您获取下一条记录。

您从共享并发队列开始。我喜欢 .NET BlockingCollection来实现这一点。

下面是一个读取文本文件并维护 10,000 个文本行的队列的示例。

public class TextFileLineBuffer
{
    private const int QueueSize = 10000;
    private BlockingCollection<string> _buffer = new BlockingCollection<string>(QueueSize);
    private CancellationTokenSource _cancelToken;
    private StreamReader reader;

    public TextFileLineBuffer(string filename)
    {
        // File is opened here so that any exception is thrown on the calling thread. 
        _reader = new StreamReader(filename);
        _cancelToken = new CancellationTokenSource();
        // start task that reads the file
        Task.Factory.StartNew(ProcessFile, TaskCreationOptions.LongRunning);
    }

    public string GetNextLine()
    {
        if (_buffer.IsCompleted)
        {
            // The buffer is empty because the file has been read
            // and all lines returned.
            // You can either call this an error and throw an exception,
            // or you can return null.
            return null;
        }

        // If there is a record in the buffer, it is returned immediately.
        // Otherwise, Take does a non-busy wait.

        // You might want to catch the OperationCancelledException here and return null
        // rather than letting the exception escape.

        return _buffer.Take(_cancelToken.Token);
    }

    private void ProcessFile()
    {
        while (!_reader.EndOfStream && !_cancelToken.Token.IsCancellationRequested)
        {
            var line = _reader.ReadLine();
            try
            {
                // This will block if the buffer already contains QueueSize records.
                // As soon as a space becomes available, this will add the record
                // to the buffer.
                _buffer.Add(line, _cancelToken.Token);
            }
            catch (OperationCancelledException)
            {
                ;
            }
        }
        _buffer.CompleteAdding();
    }

    public void Cancel()
    {
        _cancelToken.Cancel();
    }
}
Run Code Online (Sandbox Code Playgroud)

这就是它的骨架。您需要添加一个 Dispose 方法来确保线程终止并关闭文件。

我在许多不同的程序中使用了这种基本方法,取得了良好的效果。您必须进行一些分析和测试以确定适合您的应用程序的最佳缓冲区大小。您需要足够大的东西来跟上正常的数据流并处理突发的活动,但又不能大到超出您的内存预算。

IE可枚举修改

如果你想支持IEnumerable<T>,你必须做一些小的修改。我将扩展我的示例来支持IEnumerable<String>.

首先,您必须更改类声明:

public class TextFileLineBuffer: IEnumerable<string>
Run Code Online (Sandbox Code Playgroud)

然后,您必须实现GetEnumerator

public IEnumerator<String> GetEnumerator()
{
    foreach (var s in _buffer.GetConsumingEnumerable())
    {
        yield return s;
    }
}

IEnumerator IEnumerable.GetEnumerator()
{
    return GetEnumerator();
}
Run Code Online (Sandbox Code Playgroud)

这样,您就可以初始化该事物,然后将其传递给任何需要IEnumerable<string>. 所以就变成了:

var items = new TextFileLineBuffer(filename);
DoSomething(items);

void DoSomething(IEnumerable<string> list)
{
    foreach (var s in list)
        Console.WriteLine(s);
}
Run Code Online (Sandbox Code Playgroud)