TPL Dataflow和Rx组合示例

nae*_*n84 12 .net task-parallel-library system.reactive c#-5.0 tpl-dataflow

我只是想学习它们以及如何一起使用它们.我知道他们可以相互补充我只是找不到某人实际做的例子.

Eni*_*ity 19

让我先从一些背景开始.

在.NET框架具有许多特殊类型的-或者适当类或接口- ,Task<T>,IObservable<T>,Nullable<T>,IEnumerable<T>,Lazy<T>等-提供特殊权力为基础类型T.

TPL用于Task<T>表示单个值的异步计算T.

Rx用于IObservable<T>表示零或更多值的异步计算T.

这是将TPL和Rx结合在一起的"异步计算"方面.

现在,TPL也使用类型Task来表示的异步执行Action拉姆达,但是这可以被认为是一种特殊的情况Task<T>,其中Tvoid.非常像c#中的标准方法返回void如下:

public void MyMethod() { }
Run Code Online (Sandbox Code Playgroud)

Rx也允许使用称为的特殊类型的相同特殊情况Unit.

TPL和Rx之间的区别在于返回的值的数量.TPL是唯一的,而Rx是零或更多.

因此,如果您通过仅使用返回单个值的可观察序列来以特殊方式处理Rx,则可以以与TPL类似的方式进行一些计算.

例如,在TPL中我可以写:

Task.Factory
    .StartNew(() => "Hello")
    .ContinueWith(t => Console.WriteLine(t.Result));
Run Code Online (Sandbox Code Playgroud)

在Rx中,等效的是:

Observable
    .Start(() => "Hello")
    .Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

我可以在Rx中更进一步,指定应该使用TPL执行计算,如下所示:

Observable
    .Start(() => "Hello", Scheduler.TaskPool)
    .Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

(默认使用线程池.)

现在我可以做一些"混合和匹配".如果我添加对System.Reactive.Threading.Tasks命名空间的引用,我可以很容易地在任务和可观察对象之间移动.

Task.Factory
    .StartNew(() => "Hello")
    .ToObservable()
    .Subscribe(x => Console.WriteLine(x));

Observable
    .Start(() => "Hello")
    .ToTask()
    .ContinueWith(t => Console.WriteLine(t.Result));
Run Code Online (Sandbox Code Playgroud)

注意ToObservable()&.ToTask()调用和从一个库到另一个库的结果翻转.

如果我有一个返回多个值的observable,我可以使用observable .ToArray()扩展方法将多个序列值转换为可以转换为任务的单个数组值.像这样:

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(5) // is IObservable<long>
    .ToArray()
    .ToTask() // is Task<long[]>
    .ContinueWith(t => Console.WriteLine(t.Result.Length));
Run Code Online (Sandbox Code Playgroud)

我认为这是对你的问题的一个相当基本的答案.这是你期待的吗?

  • TPL Dataflow是一个与TPL截然不同的库,所以我觉得答案并没有准确地解决这个问题.然而,讨论值得注意,所以+1. (19认同)
  • 对不起,但正如GregC之前所说,我需要一个涉及TPL Dataflow的例子,而不仅仅是"TPL".我想要的是结合TPL数据流块和Rx. (2认同)