如何表示我的数据流完成?

cho*_*d37 3 c# task-parallel-library tpl-dataflow

我有一个类实现了一个使用TPL Dataflow由3个步骤组成的数据流.

在构造函数中,我将步骤创建为TransformBlocks并使用LinkTo将其链接起来,并将DataflowLinkOptions.PropagateCompletion设置为true.该类公开了一个方法,通过在第一步调用SendAsync来启动工作流.该方法返回工作流程最后一步的"完成"属性.

目前,工作流中的步骤似乎按预期执行,但最终步骤永远不会完成,除非我明确地在其上调用Complete.但这样做会使工作流程短路并且没有执行任何步骤?我究竟做错了什么?

public class MessagePipeline {
   private TransformBlock<object, object> step1;
   private TransformBlock<object, object> step2;
   private TransformBlock<object, object> step3;

   public MessagePipeline() {
      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
      step1 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step1...");
        return x;
      });
      step2 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step2...");
        return x;
      });
      step3 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step3...");
        return x;
      });

      step1.LinkTo(step2, linkOptions);
      step2.LinkTo(step3, linkOptions);
   }

   public Task Push(object message) {
      step1.SendAsync(message);
      step1.Complete();
      return step3.Completion;
   }
}
...
public class Program {
  public static void Main(string[] args) {
    var pipeline = new MessagePipeline();
    var result = pipeline.Push("Hello, world!");
    result.ContinueWith(_ => Console.WriteLine("Completed"));
    Console.ReadLine();
  }
}
Run Code Online (Sandbox Code Playgroud)

Pan*_*vos 6

链接这些步骤时,需要传递DataflowLinkOptions,并将PropagateCompletion 属性设置为true以传播完成和错误.一旦你这样做,调用Complete()第一个块将propagete完成到下游块.

一旦块接收到完成事件,它就完成处理,然后通知其链接的下游目标.

这样您就可以将所有数据发布到第一步并致电Complete().最后一个块仅在所有上游块完成后才会完成.

例如,

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
myFirstBlock.LinkTo(mySecondBlock,linkOptions);
mySecondBlock.LinkTo(myFinalBlock,linkOptions);

foreach(var message in messages)
{
    myFirstBlock.Post(message);
}
myFirstBlock.Complete();
......
await myFinalBlock.Completion;
Run Code Online (Sandbox Code Playgroud)

默认情况下,PropagateCompletion不成立,因为在更复杂的场景中(例如非线性流或动态更改流),您不希望完成和错误自动传播.如果要在不终止整个流程的情况下处理错误,您可能还希望避免自动完成.

遥想当年TPL数据流在测试默认真实的,但是这是对RTM改变

UPDATE

代码永远不会完成,因为最后一步是TransformBlock没有链接目标来接收其输出.这意味着即使块收到完成信号,它也没有完成所有工作,也无法改变自己的完成状态.

将其更改为ActionBlock<object>删除问题.