TransformBlock永远不会完成

Ste*_*ens 12 .net c# task-parallel-library tpl-dataflow

我试图在TPL Dataflow块中完成"完成".特别是,TransformBlock似乎没有完成.为什么?

示例程序

我的代码计算从1到1000的所有整数的平方.我使用了a BufferBlock和a TransformBlock.稍后在我的代码中,我等待完成TransformBlock.该块永远不会实际完成,我不明白为什么.

static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

    using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            bufferBlock.Post(number);
        }

        bufferBlock.Complete();

        // This line never completes
        calculatorBlock.Completion.Wait();

        // Unreachable code
        IList<int> results;
        if (calculatorBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

起初我以为我创造了一个死锁情况,但这似乎不是真的.当我calculatorBlock.Completion在调试器中检查任务时,其Status属性设置为WaitingForActivation.那是我的大脑蓝色筛选的那一刻.

i3a*_*non 9

您的管道挂起的原因是,无论BufferBlockTransformBlock显然没有完成,直到他们清空物品(我猜的期望行为本身IPropagatorBlock小号,虽然我还没有发现的文件就可以了).

这可以通过一个更简单的示例进行验证:

var bufferBlock = new BufferBlock<int>();
bufferBlock.Post(0);
bufferBlock.Complete();
bufferBlock.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)

除非您bufferBlock.Receive();在完成之前添加,否则无限期阻止.

如果您在通过TryReceiveAll代码块阻止之前从管道中删除项目,将另一个项目连接ActionBlock到管道,将您转换TransformBlockActionBlock或者任何其他方式将不再阻止.


关于你的特定解决方案,似乎你不需要一个BufferBlock或根本不需要TransformBlock,因为块有自己的输入队列,你不使用的返回值TransformBlock.这可以通过以下方式实现ActionBlock:

var block = new ActionBlock<int>(
    i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        Console.WriteLine("x² = {0}", (int)Math.Pow(i, 2));
    },
    new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 8});
foreach (var number in Enumerable.Range(1, 1000))
{
    block.Post(number);
}
block.Complete();
block.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)

  • 该文档只是这样说:_在数据流块上调用Complete之后,该块将完成,并且其Completion任务在处理完所有先前可用的数据后将进入最终状态。_我认为“已处理”的意思是“转化”。我永远都不会猜到它的意思是“转化并接受”。 (2认同)

Ste*_*ens 7

我现在明白了.TransformBlock在满足以下条件之前,实例不被视为"完整":

  1. TransformBlock.Complete() 被称为
  2. InputCount == 0 - 块已将其转换应用于每个传入元素
  3. OutputCount == 0 - 所有变换后的元素都离开了输出缓冲区

在我的程序中,没有链接到源的目标块TransformBlock,因此源块永远不会刷新其输出缓冲区.

作为一种解决方法,我添加了第二个BufferBlock用于存储转换元素的方法.

static void Main(string[] args)
{
    var inputBufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    var outputBufferBlock = new BufferBlock<int>();
    using (inputBufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    using (calculatorBlock.LinkTo(outputBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            inputBufferBlock.Post(number);
        }

        inputBufferBlock.Complete();
        calculatorBlock.Completion.Wait();

        IList<int> results;
        if (outputBufferBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)