理解TPL数据流并行度排序度

Sim*_*ays 5 c# task-parallel-library tpl-dataflow

我正在阅读Dataflow(任务并行库),有一部分说:

如果指定的最大并行度大于1,则会同时处理多个消息,因此,可能无法按接收顺序处理消息.但是,从块输出消息的顺序将被正确排序.

这是什么意思?

例如,我设置了具有并行度= 5的动作块:

testActionBlock = new ActionBlock<int>(i => Consumer(i),
            new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 5
            });

await Producer();
testActionBlock.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)

我的Producer()基本上将数字排队到块中:

private async Task Producer()
{
    for (int i=0; i<= 1000; i++)
    {
        await testActionBlock.SendAsync(i);
    }
    testActionBlock.Complete();
}
Run Code Online (Sandbox Code Playgroud)

而我的消费者(i)只写出了这些内容:

private async Task Consumer(int i)
{
    if (i == 1)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine(i);
}
Run Code Online (Sandbox Code Playgroud)

这是否意味着消费者(2)将被阻止,直到消费者(1)完成处理(因为有5秒的延迟)?我测试了代码,但似乎并非如此.即使我删除了5秒延迟,我也看不到输出是否正常.

[更新]

bBlock = new BufferBlock<int>(option);

testActionBlock = new ActionBlock<int>(i => Consumer(i),
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 5
    });

bBlock.LinkTo(testActionBlock);

await Producer();
testActionBlock.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)

我的Producer()现在将添加到bBlock:

private async Task Producer()
{
    for (int i=0; i<= 1000; i++)
    {
        await bBlock.SendAsync(i);
    }
    bBlock.Complete();
}
Run Code Online (Sandbox Code Playgroud)

因此,在这种情况下,消费者(1)将等待5秒,然后消费者(2)才能继续?

Tim*_*Tim 3

不。您可以将 DoP 视为线程(不完全是,但很容易想到)

所以在 5 时,它将尝试一次处理 5 个。由于#1 需要 5 秒,#2 肯定会先完成。#3、#4 和 #5 可能也会如此。甚至可能是#6(因为#2 已经完成,DoP 将允许它从#6 开始)

即使没有延迟,也无法保证处理顺序。因此,永远不要依赖他们执行的命令。话虽如此,当您使用消息输出(不是打印,因为这是它们执行的顺序)时,它们将按照它们进入的顺序重新排序,即使它们以任意顺序执行。