通过 BufferBlock 的背压不起作用。(C# TPL 数据流)

Dmi*_*mov 0 c# dataflow task-parallel-library tpl-dataflow

典型情况:生产者快,消费者慢,需要让生产者慢下来
无法按我预期工作的示例代码(解释如下):

//  I assumed this block will behave like BlockingCollection, but it doesn't 
var bb = new BufferBlock<int>(new DataflowBlockOptions {
    BoundedCapacity = 3, // looks like this does nothing
});

// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
    while (dataSource < 10) {
        var message = ++dataSource;
        bb.Post(message);
        Console.WriteLine($"Posted: {message}");
    }
    Console.WriteLine("Calling .Complete() on buffer block");
    bb.Complete();
});

// slow consumer
var ab = new ActionBlock<int>(i => {
    Thread.Sleep(500);
    Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = 2,
});

bb.LinkTo(ab);

ab.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)

我如何认为这段代码会起作用,但它不会:

  • BufferBlock bb是容量为 3 的阻塞队列。一旦达到容量,生产者应该无法使用.Post()它,直到有空位。
    • 不能那样工作。bb似乎很乐意接受任何数量的消息。
  • producer是一个快速发布消息的任务。发布所有消息后,调用bb.Complete()应通过管道传播,并在处理完所有消息后发出关闭信号。所以ab.Completion.Wait();在最后等待。
    • 也不行。一旦.Complete()被调用,动作块ab就不会再收到任何消息。

可以用 a 来完成BlockingCollection,我认为在 TPL Dataflow (TDF) 世界中BufferBlock是等价的。我想我误解了背压应该如何在 TPL Dataflow 中工作。

那么问题在哪里呢?如何运行此管道,缓冲区中不允许超过 3 条消息bb,并等待其完成?

PS:我发现了这个要点(https://gist.github.com/mnadel/df2ec09fe7eae9ba8938),建议维护一个信号量来阻止写入BufferBlock. 我认为这是“内置的”。

接受答案后更新:

接受答案后更新:

如果您正在查看这个问题,您需要记住它ActionBlock也有自己的输入缓冲区。

这是一个。然后您还需要意识到,因为所有块都有自己的输入缓冲区,所以您不需要它,因为BufferBlock您可能认为它的名称暗示了它。ABufferBlock更像是用于更复杂架构的实用程序块,或者更像是平衡加载块。但这不是背压缓冲区。

完成传播需要在链接级别明确定义。

调用时.LinkTo()需要显式地new DataflowLinkOptions {PropagateCompletion = true}作为第二个参数传递。

JSt*_*ard 5

要引入背压,您需要SendAsync在将项目发送到块中时使用。这允许您的生产者等待块为项目做好准备。像这样的东西就是你要找的:

class Program
{
    static async Task Main()
    {
        var options = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 3
        };
        var block = new ActionBlock<int>(async i =>
        {
            await Task.Delay(100);
            Console.WriteLine(i);
        }, options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        {
            await block.SendAsync(i);
        }

        block.Complete();
        await block.Completion;
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您将其更改为使用Post并打印结果,Post您将看到许多项目无法传递给块:

class Program
{
    static async Task Main()
    {
        var options = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 1
        };
        var block = new ActionBlock<int>(async i =>
        {
            await Task.Delay(1000);
            Console.WriteLine(i);
        }, options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        {
            var result = block.Post(i);
            Console.WriteLine(result);
        }

        block.Complete();
        await block.Completion;
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

True
False
False
False
False
False
False
False
False
False
0
Run Code Online (Sandbox Code Playgroud)