我有TransformManyBlock
以下设计:
我在一个巨大的文件(61GB)上运行这个块,这个文件太大而无法放入RAM中.为了避免无限制的内存增长,我BoundedCapacity
为这个块和所有下游块设置了一个非常低的值(例如1).尽管如此,该块显然会贪婪地迭代IEnumerable,它消耗了计算机上的所有可用内存,使每个进程停止运行.在我杀死进程之前,块的OutputCount继续无限制地上升.
我该怎么做才能防止块IEnumerable
以这种方式消耗?
编辑:这是一个示例程序,说明了问题:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str …
Run Code Online (Sandbox Code Playgroud) 我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式.我有大数据流网格,其中有大约40个块.网格中有两个主要功能部分:生产者部分和消费者部分.生产者应该继续为消费者提供大量工作,而消费者有时会缓慢地处理传入的工作.当消费者忙于一些指定数量的工作项时,我想暂停生产者.否则,该应用程序会占用大量内存/ CPU,并且行为不可持续.
我制作了演示应用程序以演示此问题:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new …
Run Code Online (Sandbox Code Playgroud)