TPL Dataflow - 非常快的生产者,没有那么快的消费者OutOfMemory异常

Mic*_*ael 2 .net c# producer-consumer tpl-dataflow

在将TPL Dataflow移植到我的生产代码之前,我正在尝试使用它.生产代码是传统的生产者/消费者系统 - 生产者生成消息(与金融领域相关),消费者处理这些消息.

我感兴趣的是,如果在某些时候生产者的生产速度比消费者能够处理它的速度快得多(系统会爆炸,或者会发生什么),那么环境将会保持稳定,更重要的是在这些情况下该怎么做.

因此,为了尝试类似的简单应用程序,我想出了以下内容.

    var bufferBlock = new BufferBlock<Item>();

    var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
                        {
                            MaxDegreeOfParallelism = Environment.ProcessorCount
                            ,
                            BoundedCapacity = 100000
                        };

        var dataFlowLinkOptions = new DataflowLinkOptions
                        {
                            PropagateCompletion = true
                        };

        var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
 executiondataflowBlockOptions);

            bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
            for (int i = 0; i < int.MaxValue; i++)
            {
                 bufferBlock.SendAsync(GenerateItem());
            }

            bufferBlock.Complete();
            Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)

Item 是一个非常简单的课程

internal class Item
    {
        public Item(string itemId)
        {
            ItemId = itemId;
        }

        public string ItemId { get; }
    }
Run Code Online (Sandbox Code Playgroud)

GenerateItem 只是新闻 Item

static Item GenerateItem()
{
   return new Item(Guid.NewGuid().ToString());
}
Run Code Online (Sandbox Code Playgroud)

现在,模仿不那么快的消费者 - 我ProcessItem努力坚持100ms.

static async Task ProcessItem(Item item)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"Processing #{item.ItemId} item.");
}
Run Code Online (Sandbox Code Playgroud)

执行此操作会导致20秒左右的OOM异常.

然后我继续添加更多的消费者(更多的ActionBlocks,最多10个),这会赢得更多的时间,但最终会导致相同的OOM异常.

我也注意到GC面临着巨大的压力(VS 2015诊断工具显示GC几乎一直在运行),所以我介绍了对象池(非常简单,基本上它是ConcurrentBag存储项目)Item,但我仍然打同一面墙(抛出OOM异常).

提供内存中的内容的一些细节,为什么它已经用完了.

  • 最大尺寸的对象类型为SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>&ConcurrentQueue+Segment<TplDataFlow.Item>
  • 我看到BufferBlock的是InputBuffer充满了Items(Count = 14,562,296)
  • 由于我设置BoundedCapacityActionBlock(S),其输入缓冲器还靠近所述可配置数目(InputCount = 99996)

为了确保较慢的生产者能够让消费者跟上,我让生产者在迭代之间睡觉:

for (int i = 0; i < int.MaxValue; i++)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(50));
    bufferBlock.SendAsync(GenerateItem());
}
Run Code Online (Sandbox Code Playgroud)

它工作正常 - 没有异常被抛出,内存使用率一直很低,我不再看到任何GC压力.

所以我几乎没有问题

  1. 在尝试使用TPL Dataflow构建块重现非常快速的生产者/慢速消费者场景时,我是否在做任何本质上错误的事情
  2. 有没有办法使这项工作,而不是失败与OOM异常.
  3. 关于如何在TPL Dataflow上下文中处理这种场景(非常快的生产者/慢速消费者)的最佳实践的任何评论/链接.
  4. 我对这个问题的理解是 - 由于消费者无法跟上,BufferBlock内部缓冲区很快就会充满消息,并且在一些消费者回来请求下一条消息之后会暂停消息,因为应用程序用完了内存(由于填充内部缓冲区BufferBlock) - 你会同意吗?

我正在使用Microsoft.Tpl.Dataflow包-version 4.5.24..NET 4.5(C#6).进程是32位.

Ste*_*ary 8

您已经很好地识别了问题:BufferBlock填充其输入缓冲区直到它到达OOM.

要解决此问题,您还应该BoundedCapacity为缓冲区块添加一个选项.这将自动为您量身Thread.Sleep定制生产者(不需要生产者).