IEnumerable <T>,Parallel.ForEach和Memory Management

sno*_*FFF 9 c# sql-server memory-management task-parallel-library

我正在阅读和处理大量的Sql Server数据(数百万行中有10行,100多万行+行).在每个源行上执行的处理很重要.单线程版本没有达到预期效果.我目前的并行处理版本在一些较小的批次(300,000个源行,1M个输出行)上表现非常好,但是我遇到了一些非常大的运行的Out of Memory异常.

代码受到了这里提供的答案的启发: 有没有办法将任务并行库(TPL)与SQLDataReader一起使用?

这是一般的想法:

获取源数据(数据太大而无法读入内存,因此我们将"流式传输")

public static IEnumerable<MyObject> ReadData()
{
    using (SqlConnection con = new SqlConnection(Settings.ConnectionString)) 
       using (SqlCommand cmd = new SqlCommand(selectionSql, con))
       {
            con.Open();
            using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.CloseConnection))
            {
            while (dr.Read())
            {
                // make some decisions here – 1 to n source rows are used
                // to create an instance of MyObject
                yield return new MyObject(some parameters);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

一旦我们到达并行处理点,我们希望使用SqlBulkCopy对象来写入数据.因此,我们不希望并行处理单个MyObjects,因为我们希望每个线程执行批量复制.因此,我们将从上面读取另一个返回"批量"MyObjects的IEnumerable

class MyObjectBatch 
{
    public List<MyObject> Items { get; set; }

    public MyObjectBatch (List<MyObject> items)
    {
        this.Items = items;
    }

    public static IEnumerable<MyObjectBatch> Read(int batchSize)
    {
        List<MyObject> items = new List<MyObjectBatch>();
        foreach (MyObject o in DataAccessLayer.ReadData())
        {
            items.Add(o);
            if (items.Count >= batchSize)
            {
                yield return new MyObjectBatch(items);                    
                items = new List<MyObject>(); // reset
            }
        }
        if (items.Count > 0) yield return new MyObjectBatch(items);            
    }
}
Run Code Online (Sandbox Code Playgroud)

最后,我们达到并行处理"批次"的程度

ObjectProcessor processor = new ObjectProcessor();

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Settings.MaxThreads };
Parallel.ForEach(MyObjectBatch.Read(Settings.BatchSize), options, batch =>
{
    // Create a container for data processed by this thread
    // the container implements IDataReader
    ProcessedData targetData = new ProcessedData(some params));

    // process the batch… for each MyObject in MyObjectBatch – 
    // results are collected in targetData
    for (int index = 0; index < batch.Items.Count; index++) 
    {
        processor.Process(batch.Item[index], targetData);
    }

    // bulk copy the data – this creates a SqlBulkCopy instance
    // and loads the data to the target table
    DataAccessLayer.BulkCopyData(targetData);

    // explicitly set the batch and targetData to null to try to free resources

});
Run Code Online (Sandbox Code Playgroud)

以上所有内容都已大大简化,但我相信它包含了所有重要概念.这是我看到的行为:

性能非常好 - 对于合理大小的数据集,我得到了非常好的结果.

但是,随着它的处理,消耗的内存继续增长.对于较大的数据集,这会导致异常.

我通过日志记录证明,如果我减慢数据库的读取速度,它会减慢批量读取的速度,然后创建并行线程(特别是如果我设置了MaxDegreeOfParallelization).我担心我的阅读速度比我能处理的要快,但是如果我限制线程,它应该只读取每个线程可以处理的内容.

较小或较大的批量大小对性能有一些影响,但使用的内存量与批量大小一致.

哪里有机会在这里恢复一些记忆?由于我的"批次"超出范围,是否应恢复记忆?我可以在前两个层面做些什么来帮助释放一些资源吗?

回答一些问题:1.它可以纯粹用SQL完成 - 不,处理逻辑非常复杂(和动态).一般来说,它正在进行低级二进制解码.我们尝试过SSIS(取得了一些成功).问题是源数据的定义以及输出是非常动态的.SSIS似乎需要非常严格的输入和输出列定义,在这种情况下不起作用.

有人还问过ProcessedData对象 - 实际上这很简单:

class ProcessedData : IDataReader 
{
    private int _currentIndex = -1;
    private string[] _fieldNames { get; set; }

    public string TechnicalTableName { get; set; }        
    public List<object[]> Values { get; set; }

    public ProcessedData(string schemaName, string tableName, string[] fieldNames)
    {            
        this.TechnicalTableName = "[" + schemaName + "].[" + tableName + "]";
        _fieldNames = fieldNames;            
        this.Values = new List<object[]>();
    }

    #region IDataReader Implementation

    public int FieldCount
    {
        get { return _fieldNames.Length; }
    }

    public string GetName(int i)
    {
        return _fieldNames[i];
    }

    public int GetOrdinal(string name)
    {
        int index = -1;
        for (int i = 0; i < _fieldNames.Length; i++)
        {
            if (_fieldNames[i] == name)
            {
                index = i;
                break;
            }
        }
        return index;
    }

    public object GetValue(int i)
    {
        if (i > (Values[_currentIndex].Length- 1))
        {
            return null;
        }
        else
        {
            return Values[_currentIndex][i];
        }
    }

    public bool Read()
    {
        if ((_currentIndex + 1) < Values.Count)
        {
            _currentIndex++;
            return true;
        }
        else
        {
            return false;
        }
    }

    // Other IDataReader things not used by SqlBulkCopy not implemented
}
Run Code Online (Sandbox Code Playgroud)

更新和结论:

我收到了大量有价值的意见,但希望将其总结为一个结论.首先,我的主要问题是,是否还有其他任何事情(我发布的代码)可以积极地回收内存.共识似乎是方法是正确的,但我的特定问题并不完全受CPU限制,因此简单的Parallel.ForEach将无法正确管理处理.

感谢usr的调试建议以及他非常有趣的PLINQ建议.感谢zmbq帮助澄清什么是和未发生的事情.

最后,任何可能正在追逐类似问题的人都可能会发现以下讨论有用:

如果使用具有大对象的枚举,Parallel.ForEach可能会导致"内存不足"异常

并行操作批处理

usr*_*usr 8

我不完全理解如何Parallel.ForEach拉项目,但我认为默认情况下它会拉出多个以节省锁定开销.这意味着多个项目可能在内部排队Parallel.ForEach.这可能会导致OOM快速,因为您的项目非常大.

您可以尝试给它一个Partitioner返回单个项目.

如果这没有帮助,我们需要深入挖掘.调试内存问题Parallel和PLINQ是令人讨厌的.例如,其中一个中存在错误,导致旧项目无法快速释放.

作为解决方法,您可以在处理后清除列表.这将至少允许在处理完成后确定性地回收所有项目.

关于您发布的代码:它是干净的,高质量的,您遵守高标准的资源管理.我不会怀疑你的内存或资源泄漏.这仍然不是不可能的.你可以通过注释掉里面的代码Parallel.ForEach并用一个替换它来测试它Thread.Sleep(1000 * 60).如果泄漏仍然存在,那你就没有错.

根据我的经验,PLINQ更容易获得精确的并行度(因为当前版本使用您指定的精确DOP,永远不会更少).像这样:

GetRows()
.AsBatches(10000)    
.AsParallel().WithDegreeOfParallelism(8)
.Select(TransformItems) //generate rows to write
.AsEnumerable() //leave PLINQ
.SelectMany(x => x) //flatten batches
.AsBatches(1000000) //create new batches with different size
.AsParallel().WithDegreeOfParallelism(2) //PLINQ with different DOP
.ForEach(WriteBatchToDB); //write to DB
Run Code Online (Sandbox Code Playgroud)

这将为您提供一个从DB中提取的简单管道,使用针对CPU优化的特定DOP进行CPU绑定工作,并使用更大批量和更少DOP写入数据库.

这非常简单,它应该使用各自的DOP独立地最大化CPU和磁盘.玩DOP号码.