使用LinkTo Predicate的TPL数据流阻止

jmi*_*has 3 c# unit-testing task-parallel-library

我有一些块最终从TransformBlock转到基于LinkTo谓词的其他三个转换块之一.我正在使用DataflowLinkOptions传播完成.问题是当一个谓词被满足并且该块被启动时,我的其余管道继续.似乎管道应该等待此块首先完成.

这个代码是这样的:

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);
Run Code Online (Sandbox Code Playgroud)

现在,这不像我所说的那样工作,所以我发现获得我想要的行为的唯一方法是取出linkOptions并将以下内容添加到mainBlock的lambda中.

mainBlock = new TransformBlock<Thing,Thing>(input =>
{
    DoMyStuff(input);

    if (input.Status = Status.Complete)
    {
        mainBlock.Completion.ContinueWith(t => block1.Complete());
    }
    if (input.Status = Status.Cancelled)
    {
        mainBlock.Completion.ContinueWith(t => block2.Complete());
    }
    if (input.Status = Status.Delayed)
    {
        mainBlock.Completion.ContinueWith(t => block3.Complete());
    }

    return input;
});
Run Code Online (Sandbox Code Playgroud)

所以问题是,这是让这个工作的唯一方法吗?

顺便说一句,这已经在我的单元测试中运行,其中一个数据项通过它来尝试调试管道行为.每个块都经过多次单元测试单独测试.那么在我的管道单元测试中发生的事情就是在块执行完毕之前命中了assert,因此失败了.

如果我删除block2和block3链接并使用linkOptions调试测试,它可以正常工作.

svi*_*ick 6

您的问题不在于您的问题中的代码,它正常工作:当主程序段完成时,所有三个后续程序块都标记为完成.

问题在于结束块:您也在PropagateCompletion那里使用,这意味着当前三个块中的任何一个块完成时,结束块被标记为完成.你想要的是在完成所有三个块并且Task.WhenAll().ContinueWith()你的答案中的组合完成时将它标记为完成(虽然该片段的第一部分是不必要的,但这完全相同PropagateCompletion).

事实证明,链接选项传播(至少这是我的猜测)将传播不满足linkTo中谓词的块的完成.

是的,它总是传播完成.完成没有与之关联的任何项目,因此将谓词应用于它是没有任何意义的.也许你总是只有一个项目(这并不常见),这让你更加困惑?

如果我的猜测是正确的,我觉得这是链接选项完成传播中的错误或设计错误.如果块从未使用过,为什么要完成?

为什么不呢?对我而言,这非常有意义:即使Status.Delayed此时没有任何项目,您仍然希望完成处理这些项目的块,以便任何后续代码都可以知道所有延迟项目已经处理完毕.事实上没有任何关系并不重要.


无论如何,如果经常遇到这种情况,您可能希望创建一个辅助方法,同时将多个源块链接到单个目标块并正确传播完成:

public static void LinkTo<T>(
    this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
    bool propagateCompletion)
{
    foreach (var source in sources)
    {
        source.LinkTo(target);
    }

    if (propagateCompletion)
        Task.WhenAll(sources.Select(source => source.Completion))
            .ContinueWith(_ => target.Complete());
}
Run Code Online (Sandbox Code Playgroud)

用法:

new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);
Run Code Online (Sandbox Code Playgroud)


jmi*_*has 0

好的。所以我首先要感谢科里。当我第一次读到他的评论时,我有点恼火,因为我觉得我的代码很好地说明了这个概念,并且可以轻松地变成工作版本。但无论如何,由于他的评论,我觉得有必要做一个可以发布的完整的可测试版本。

在我的测试中,令人惊讶的部分是,尽管它模仿了我的真实代码,但我认为会失败的路径却通过了,而我认为会通过的路径却失败了。这让我有点头晕。所以我开始对原始代码进行更多的排列。基本上我创建了同步块和异步块并制作了两种管道。总共四个,2 个同步和 2 个异步,每个使用链接选项进行传播,另一个使用 MainBlock 中的完成任务,如图所示。

在向异步任务添加一些任务延迟后,我发现同步版本通过了测试,而异步版本失败了。

所以,问题的最终解决方案不是以上任何一种。事实证明,链接选项传播(至少这是我的猜测)将传播不满足 linkTo 中谓词的块的完成。因此,当状态为“完成”的事物下降时,它会进入 Block1。

哦,我应该指出,在完整的测试代码中,我将所有块 1,2 和 3 连接到同一个 EndBlock,这在原始代码中没有显示。

不管怎样,在满足谓词并且事物进入区块 1 后,我相信区块 2 和 3 已经完成。这会导致 EndBlock 完成我们在单元测试中等待的任务,并且 Assert 失败,因为 Block1 尚未完成其工作。

如果我的猜测是正确的,我觉得这是链接选项完成传播中的错误或设计错误。如果一个块从未被使用过,为什么它应该是完整的?

所以,这就是我为解决问题所做的事情。我取出链接选项并手动连接完成事件。像这样:

MainBlock.Completion.ContinueWith(t =>
{
Block1.Complete();
Block2.Complete();
Block3.Complete();
});

Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion)
.ContinueWith(t =>
{
    EndBlock.Complete();
});
Run Code Online (Sandbox Code Playgroud)

这工作得很好,当转移到我的真实代码时也工作得很好。Task.WhenAll 让我相信未使用的块被设置为完成以及为什么自动传播是问题所在。

我希望这可以帮助别人。当我发布所有测试代码时,我会回来添加一个链接。

编辑:这是测试代码要点的链接https://gist.github.com/jmichas/bfab9cec84f0d1e40e12