在数据流网络中使用BufferBlock <T>的好处

Dim*_*tri 14 .net c# task-parallel-library tpl-dataflow

我想知道使用链接到一个或多个ActionBlocks的BufferBlock是否有好处,除了限制(使用BoundedCapacity),而不是直接发布到ActionBlock(只要不需要限制).

svi*_*ick 19

如果您只想将项目从一个块转发到其他几个块,则不需要BufferBlock.

但肯定存在一些有用的情况.例如,如果您有一个复杂的数据流网络,您可能希望从较小的子网络构建它,每个子网络都使用自己的方法创建.要做到这一点,你需要一些方法来表示一组块.在您提到的情况下,从方法返回该单个BufferBlock(可能是ITargetBlock)将是一个简单的解决方案.

另一个BufferBlock有用的示例是,您希望将项目从多个源块发送到多个目标块.如果您用作BufferBlock中介,则不必将每个源块连接到每个目标块.

我相信你还可以使用其他许多例子BufferBlock.当然,如果你没有看到任何理由在你的情况下使用它,那么不要.


Vot*_*fee 19

要添加到svick的答案,缓冲区块还有另一个好处.如果您有一个具有多个输出链接的块并希望在它们之间取得平衡,则必须将输出块转为非贪婪并添加缓冲区块来处理排队.我发现以下示例很有用:

引用现在已经死亡的链接:

这是我们计划做的事情:

  • 一些代码块将使用Post(T t)方法将数据发布到BufferBlock.
  • 此BufferBlock使用BufferBlock的LinkTo t)方法链接到3个ActionBlock实例.

注意,BufferBlock不会将输入数据的副本切换到它链接到的所有目标块.而是仅对一个目标块执行此操作.这是我们期望当一个目标忙于处理请求时.它将被传递到了另一个目标.现在让我们参考下面的代码:

static void Main(string[] args)
{
    BufferBlock<int> bb = new BufferBlock<int>();

    ActionBlock<int> a1 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(100);
        Console.WriteLine("Action A1 executing with value {0}", a);
    });

    ActionBlock<int> a2 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(50);
        Console.WriteLine("Action A2 executing with value {0}", a);
    });

    ActionBlock<int> a3 = new ActionBlock<int>(a =>
    {
        Thread.Sleep(50);
        Console.WriteLine("Action A3 executing with value {0}", a);
    });

    bb.LinkTo(a1);
    bb.LinkTo(a2);
    bb.LinkTo(a3);

    Task t = new Task(() =>
        {
            int i = 0;
            while (i < 10)
            {
                Thread.Sleep(50);
                i++;
                bb.Post(i);
            }
        }
    );

    t.Start();
    Console.Read();
}
Run Code Online (Sandbox Code Playgroud)

执行时会产生以下输出:

  • 动作A1以值1执行
  • 动作A1以值2执行
  • 动作A1以值3执行
  • 动作A1以值4执行
  • 动作A1以值5执行
  • 动作A1以值6执行
  • 动作A1以值7执行
  • 动作A1以值8执行
  • 动作A1以值9执行
  • 动作A1以值10执行

这表明只有一个目标实际上正在执行所有数据,即使它很忙(由于Thread.Sleep(100)有目的地添加).为什么?

这是因为所有目标块本质上都是贪婪的,并且即使在它们无法处理数据时也会缓冲输入.要改变这种行为,我们在DataFlowBlockOptions中将Greedy属性设置为false,同时初始化ActionBlock,如下所示.

static void Main(string[] args)
{
    BufferBlock<int> bb = new BufferBlock<int>();
    ActionBlock<int> a1 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(100);
            Console.WriteLine("Action A1 executing with value {0}", a);
        }
        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
            maxDegreeOfParallelism: 1, maxMessagesPerTask: 1,
            cancellationToken: CancellationToken.None,
            //Not Greedy
            greedy: false));

    ActionBlock<int> a2 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(50);
            Console.WriteLine("Action A2 executing with value {0}", a);
        }
        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
            maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
            cancellationToken: CancellationToken.None,
            greedy: false));
    ActionBlock<int> a3 = new ActionBlock<int>(a =>
        {
            Thread.Sleep(50);
            Console.WriteLine("Action A3 executing with value {0}", a);
        }
        , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
            maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
            cancellationToken: CancellationToken.None,
            greedy: false));

    bb.LinkTo(a1);
    bb.LinkTo(a2);
    bb.LinkTo(a3);

    Task t = new Task(() =>
    {
        int i = 0;
        while (i < 10)
        {
            Thread.Sleep(50);
            i++;
            bb.Post(i);
        }
    });

    t.Start();
    Console.Read();
}
Run Code Online (Sandbox Code Playgroud)

该程序的输出是:

  • 动作A1以值1执行
  • 动作A2以值3执行
  • 动作A1以值2执行
  • 动作A3以值6执行
  • 动作A3以值7执行
  • 动作A3以值8执行
  • 动作A2以值5执行
  • 动作A3以值9执行
  • 动作A1以值4执行
  • 动作A2以值10执行

这显然是三个ActionBlock中数据的分布如预期的那样.

  • 无法获得第二个示例进行编译。 (2认同)

小智 5

不,由于多种原因,第二个示例无法编译:只能为“分组”数据流块设置 greedy=false - 不能为执行块设置;然后它必须通过 GroupingDataflowBlockOptions 设置 - 而不是 DataflowBlockOptions;然后将其设置为属性值“{ Greedy = false }”而不是构造函数参数。

如果您想限制操作块的容量,请通过设置 DataflowBlockOptions 的 BoundedCapacity 属性的值来实现(尽管正如 OP 所述,他们已经知道此选项)。像这样:

var a1 = new ActionBlock<int>(
            i => doSomeWork(i), 
            new ExecutionDataflowBlockOptions {BoundedCapacity = 1}
        );
Run Code Online (Sandbox Code Playgroud)