Dim*_*tri 14 .net c# task-parallel-library tpl-dataflow
我想知道使用链接到一个或多个ActionBlocks的BufferBlock是否有好处,除了限制(使用BoundedCapacity),而不是直接发布到ActionBlock(只要不需要限制).
svi*_*ick 19
如果您只想将项目从一个块转发到其他几个块,则不需要BufferBlock
.
但肯定存在一些有用的情况.例如,如果您有一个复杂的数据流网络,您可能希望从较小的子网络构建它,每个子网络都使用自己的方法创建.要做到这一点,你需要一些方法来表示一组块.在您提到的情况下,从方法返回该单个BufferBlock
(可能是ITargetBlock
)将是一个简单的解决方案.
另一个BufferBlock
有用的示例是,您希望将项目从多个源块发送到多个目标块.如果您用作BufferBlock
中介,则不必将每个源块连接到每个目标块.
我相信你还可以使用其他许多例子BufferBlock
.当然,如果你没有看到任何理由在你的情况下使用它,那么不要.
Vot*_*fee 19
要添加到svick的答案,缓冲区块还有另一个好处.如果您有一个具有多个输出链接的块并希望在它们之间取得平衡,则必须将输出块转为非贪婪并添加缓冲区块来处理排队.我发现以下示例很有用:
引用现在已经死亡的链接:
这是我们计划做的事情:
注意,BufferBlock不会将输入数据的副本切换到它链接到的所有目标块.而是仅对一个目标块执行此操作.这是我们期望当一个目标忙于处理请求时.它将被传递到了另一个目标.现在让我们参考下面的代码:
static void Main(string[] args)
{
BufferBlock<int> bb = new BufferBlock<int>();
ActionBlock<int> a1 = new ActionBlock<int>(a =>
{
Thread.Sleep(100);
Console.WriteLine("Action A1 executing with value {0}", a);
});
ActionBlock<int> a2 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A2 executing with value {0}", a);
});
ActionBlock<int> a3 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A3 executing with value {0}", a);
});
bb.LinkTo(a1);
bb.LinkTo(a2);
bb.LinkTo(a3);
Task t = new Task(() =>
{
int i = 0;
while (i < 10)
{
Thread.Sleep(50);
i++;
bb.Post(i);
}
}
);
t.Start();
Console.Read();
}
Run Code Online (Sandbox Code Playgroud)
执行时会产生以下输出:
这表明只有一个目标实际上正在执行所有数据,即使它很忙(由于Thread.Sleep(100)有目的地添加).为什么?
这是因为所有目标块本质上都是贪婪的,并且即使在它们无法处理数据时也会缓冲输入.要改变这种行为,我们在DataFlowBlockOptions中将Greedy属性设置为false,同时初始化ActionBlock,如下所示.
static void Main(string[] args)
{
BufferBlock<int> bb = new BufferBlock<int>();
ActionBlock<int> a1 = new ActionBlock<int>(a =>
{
Thread.Sleep(100);
Console.WriteLine("Action A1 executing with value {0}", a);
}
, new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
maxDegreeOfParallelism: 1, maxMessagesPerTask: 1,
cancellationToken: CancellationToken.None,
//Not Greedy
greedy: false));
ActionBlock<int> a2 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A2 executing with value {0}", a);
}
, new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
cancellationToken: CancellationToken.None,
greedy: false));
ActionBlock<int> a3 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A3 executing with value {0}", a);
}
, new DataflowBlockOptions(taskScheduler: TaskScheduler.Default,
maxDegreeOfParallelism: 1, maxMessagesPerTask: -1,
cancellationToken: CancellationToken.None,
greedy: false));
bb.LinkTo(a1);
bb.LinkTo(a2);
bb.LinkTo(a3);
Task t = new Task(() =>
{
int i = 0;
while (i < 10)
{
Thread.Sleep(50);
i++;
bb.Post(i);
}
});
t.Start();
Console.Read();
}
Run Code Online (Sandbox Code Playgroud)
该程序的输出是:
这显然是三个ActionBlock中数据的分布如预期的那样.
小智 5
不,由于多种原因,第二个示例无法编译:只能为“分组”数据流块设置 greedy=false - 不能为执行块设置;然后它必须通过 GroupingDataflowBlockOptions 设置 - 而不是 DataflowBlockOptions;然后将其设置为属性值“{ Greedy = false }”而不是构造函数参数。
如果您想限制操作块的容量,请通过设置 DataflowBlockOptions 的 BoundedCapacity 属性的值来实现(尽管正如 OP 所述,他们已经知道此选项)。像这样:
var a1 = new ActionBlock<int>(
i => doSomeWork(i),
new ExecutionDataflowBlockOptions {BoundedCapacity = 1}
);
Run Code Online (Sandbox Code Playgroud)