Ste*_*ens 12 .net c# task-parallel-library tpl-dataflow
我试图在TPL Dataflow块中完成"完成".特别是,TransformBlock似乎没有完成.为什么?
我的代码计算从1到1000的所有整数的平方.我使用了a BufferBlock和a TransformBlock.稍后在我的代码中,我等待完成TransformBlock.该块永远不会实际完成,我不明白为什么.
static void Main(string[] args)
{
var bufferBlock = new BufferBlock<int>();
var calculatorBlock = new TransformBlock<int, int>(i =>
{
Console.WriteLine("Calculating {0}²", i);
return (int)Math.Pow(i, 2);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
foreach (var number in Enumerable.Range(1, 1000))
{
bufferBlock.Post(number);
}
bufferBlock.Complete();
// This line never completes
calculatorBlock.Completion.Wait();
// Unreachable code
IList<int> results;
if (calculatorBlock.TryReceiveAll(out results))
{
foreach (var result in results)
{
Console.WriteLine("x² = {0}", result);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
起初我以为我创造了一个死锁情况,但这似乎不是真的.当我calculatorBlock.Completion在调试器中检查任务时,其Status属性设置为WaitingForActivation.那是我的大脑蓝色筛选的那一刻.
您的管道挂起的原因是,无论BufferBlock和TransformBlock显然没有完成,直到他们清空物品(我猜的期望行为本身IPropagatorBlock小号,虽然我还没有发现的文件就可以了).
这可以通过一个更简单的示例进行验证:
var bufferBlock = new BufferBlock<int>();
bufferBlock.Post(0);
bufferBlock.Complete();
bufferBlock.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)
除非您bufferBlock.Receive();在完成之前添加,否则无限期阻止.
如果您在通过TryReceiveAll代码块阻止之前从管道中删除项目,将另一个项目连接ActionBlock到管道,将您转换TransformBlock为ActionBlock或者任何其他方式将不再阻止.
关于你的特定解决方案,似乎你不需要一个BufferBlock或根本不需要TransformBlock,因为块有自己的输入队列,你不使用的返回值TransformBlock.这可以通过以下方式实现ActionBlock:
var block = new ActionBlock<int>(
i =>
{
Console.WriteLine("Calculating {0}²", i);
Console.WriteLine("x² = {0}", (int)Math.Pow(i, 2));
},
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 8});
foreach (var number in Enumerable.Range(1, 1000))
{
block.Post(number);
}
block.Complete();
block.Completion.Wait();
Run Code Online (Sandbox Code Playgroud)
我想我现在明白了.TransformBlock在满足以下条件之前,实例不被视为"完整":
TransformBlock.Complete() 被称为InputCount == 0 - 块已将其转换应用于每个传入元素OutputCount == 0 - 所有变换后的元素都离开了输出缓冲区在我的程序中,没有链接到源的目标块TransformBlock,因此源块永远不会刷新其输出缓冲区.
作为一种解决方法,我添加了第二个BufferBlock用于存储转换元素的方法.
static void Main(string[] args)
{
var inputBufferBlock = new BufferBlock<int>();
var calculatorBlock = new TransformBlock<int, int>(i =>
{
Console.WriteLine("Calculating {0}²", i);
return (int)Math.Pow(i, 2);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var outputBufferBlock = new BufferBlock<int>();
using (inputBufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
using (calculatorBlock.LinkTo(outputBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
foreach (var number in Enumerable.Range(1, 1000))
{
inputBufferBlock.Post(number);
}
inputBufferBlock.Complete();
calculatorBlock.Completion.Wait();
IList<int> results;
if (outputBufferBlock.TryReceiveAll(out results))
{
foreach (var result in results)
{
Console.WriteLine("x² = {0}", result);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)