具有BoundedCapacity的BufferBlock和ActionBlock不使用max DOP

Mic*_*tov 5 .net c# task-parallel-library async-await tpl-dataflow

我有这个代码:

var data = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);

    await Task.Delay(1000);

    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = -1
});

data.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });

for (var id = 1; id <= 3; id++)
{
    Console.WriteLine("[{0:T}] Sending {1}", DateTime.Now, id);
    data.SendAsync(id).Wait();
    Console.WriteLine("[{0:T}] Sending {1} complete", DateTime.Now, id);
}

data.Complete();

Task.WhenAll(data.Completion, action.Completion).Wait();
Run Code Online (Sandbox Code Playgroud)

这段代码得到了我的输出:

[22:31:22] Sending 1
[22:31:22] Sending 1 complete
[22:31:22] Sending 2
[22:31:22] #1: Start
[22:31:22] Sending 2 complete
[22:31:22] Sending 3
[22:31:23] #1: End
[22:31:23] #2: Start
[22:31:23] Sending 3 complete
[22:31:24] #2: End
[22:31:24] #3: Start
[22:31:25] #3: End
Run Code Online (Sandbox Code Playgroud)

为什么ActionBlock即使它有无限制的DOP 也不能并行工作?

i3a*_*non 7

ActionBlock似乎具有有限程度的并行性的原因是因为它具有BoundedCapacity1 BoundedCapacity(不像InputCount)包括此刻正在处理的项目.这很容易证明:

var block = new ActionBlock<int>(_ => Task.Delay(-1), new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

await block.SendAsync(4); // Adds a new item
await block.SendAsync(4); // Blocks forever
Run Code Online (Sandbox Code Playgroud)

这意味着,当您设置MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded块时,当时不能接受多个项目,因此实际上限制了并行度.

您可以通过设置更大来解决此问题BoundedCapacity:

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);
    await Task.Delay(1000);
    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
Run Code Online (Sandbox Code Playgroud)