TPL Dataflow块消耗所有可用内存

bri*_*rns 6 .net c# dataflow task-parallel-library tpl-dataflow

我有TransformManyBlock以下设计:

  • 输入:文件的路径
  • 输出:IEnumerable文件的内容,一次一行

我在一个巨大的文件(61GB)上运行这个块,这个文件太大而无法放入RAM中.为了避免无限制的内存增长,我BoundedCapacity为这个块和所有下游块设置了一个非常低的值(例如1).尽管如此,该块显然会贪婪地迭代IEnumerable,它消耗了计算机上的所有可用内存,使每个进程停止运行.在我杀死进程之前,块的OutputCount继续无限制地上升.

我该怎么做才能防止块IEnumerable以这种方式消耗?

编辑:这是一个示例程序,说明了问题:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str =>
            {
                Console.WriteLine(str.Substring(0, 10));
                Thread.Sleep(1000);
            }, options);

        firstBlock.LinkTo(secondBlock);
        firstBlock.Completion.ContinueWith(task =>
            {
                if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
                else secondBlock.Complete();
            });

        firstBlock.Post('A');
        firstBlock.Complete();
        for (; ; )
        {
            Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
            Thread.Sleep(3000);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您使用的是64位,请确保在Visual Studio中清除"首选32位"选项.我的计算机上有16GB的RAM,这个程序会立即占用每个可用的字节.

i3a*_*non 6

您似乎误解了 TPL 数据流的工作原理。

BoundedCapacity限制您可以发布到块中的项目数量。在您的情况下,这意味着单个char进入TransformManyBlock并且单个string进入ActionBlock.

因此,您将单个项目发布到TransformManyBlock,然后返回1024*1024字符串并尝试将它们传递给一次ActionBlock只接受一个的 。其余的字符串将位于TransformManyBlock的输出队列中。

您可能想要做的是创建单个块并通过等待(同步或其他方式)在达到容量时以流方式将项目发布到其中:

private static void Main()
{
    MainAsync().Wait();
}

private static async Task MainAsync()
{
    var block = new ActionBlock<string>(async item =>
    {
        Console.WriteLine(item.Substring(0, 10));
        await Task.Delay(1000);
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    foreach (var item in GetSequence('A'))
    {
        await block.SendAsync(item);
    }

    block.Complete();
    await block.Completion;
}
Run Code Online (Sandbox Code Playgroud)

  • @Bugmaster 这根本不是一个愚蠢的问题:http://stackoverflow.com/a/13605979/885318 (2认同)
  • @i3arnon:谢谢,我没有意识到 Post() 无论如何都会立即返回,我认为它会阻塞直到消息被消耗掉。哎呀! (2认同)