我对TPL Dataflow这个话题很陌生.在C#中的Concurrency一书中,我测试了以下示例.我无法弄清楚为什么没有输出应该是2*2-2=2;
static void Main(string[] args)
{
//Task tt = test();
Task tt = test1();
Console.ReadLine();
}
static async Task test1()
{
try
{
var multiplyBlock = new TransformBlock<int, int>(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(subtractBlock,
new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(2);
await subtractBlock.Completion;
int temp = subtractBlock.Receive();
Console.WriteLine(temp);
}
catch (AggregateException e)
{
// The exception is caught here.
foreach (var v in e.InnerExceptions)
{
Console.WriteLine(v.Message);
}
}
}
Run Code Online (Sandbox Code Playgroud)
Update1:我尝试了另一个例子.我仍然没有使用,Block.Complete()但我想当第一个块完成时,结果会自动传递到第二个块.
private static async Task test3()
{
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] {i, i + 1}; });
ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i));
tmb.LinkTo(ab);
for (int i = 0; i < 4; i++)
{
tmb.Post(i);
}
//tmb.Complete();
await ab.Completion;
Console.WriteLine("Finished post");
}
Run Code Online (Sandbox Code Playgroud)
这部分代码:
await subtractBlock.Completion;
int temp = subtractBlock.Receive();
Run Code Online (Sandbox Code Playgroud)
首先(异步)等待减法块完成,然后尝试从块中检索输出.
有两个问题:源块永远不会完成,代码正在尝试从已完成的块中检索输出.一旦块完成,它将不再产生任何数据.
(我假设您指的是配方4.2中的示例,它将发布1,导致异常,从而使块处于故障状态).
因此,您可以通过完成源块来修复此测试(并且完成将沿着链接subtractBlock自动传播),并通过在(异步)等待subtractBlock完成之前读取输出:
multiplyBlock.Complete();
int temp = subtractBlock.Receive();
await subtractBlock.Completion;
Run Code Online (Sandbox Code Playgroud)