如何通过在原始序列的值上运行任务来创建Rx序列?

Dan*_*mov 5 .net c# asynchronous task-parallel-library system.reactive

我有一个类型序列IObservable<T>和一个映射T, CancellationToken到a 的函数Task<U>.什么是最简洁的方式来IObservable<U>摆脱他们?

我需要以下语义:

  • 上一个项目的任务完成后,每个任务都会启动
  • 如果任务已被取消或出现故障,则会跳过该任务
  • 严格保留原始序列的顺序

这是我看到的签名:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector
);
Run Code Online (Sandbox Code Playgroud)

我还没有写任何代码,但我会除非有人打败我.
在任何情况下,我都不熟悉运营商Window,所以我的解决方案可能不那么优雅.

我需要C#4中的解决方案,但为了比较,也欢迎C#5答案.


如果你很好奇,下面是我的真实场景,或多或少:

Dropbox.GetImagesRecursively ()
    .ObserveOn (SynchronizationContext.Current)
    .Select (DownloadImage)
    .Subscribe (AddImageToFilePicker);
Run Code Online (Sandbox Code Playgroud)

Dan*_*mov 3

到目前为止,这似乎对我有用:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector)
{
    return source
        .Select (item => 
            Observable.Defer (() => 
                Observable.StartAsync (ct => selector (item, ct))
                    .Catch (Observable.Empty<U> ())
            ))
        .Concat ();
}
Run Code Online (Sandbox Code Playgroud)

我们将一个基于延迟任务的异常吞咽可观察值映射到每个项目,然后将它们连接起来。


我的思考过程是这样的。

我注意到其中一个SelectMany重载几乎完全符合我的要求,甚至具有完全相同的签名。但它并没有满足我的需求:

  • 它会在原始项目出现时创建任务,而我需要等待每个任务完成
  • 它不提供跳过已取消和有故障的任务的选项

我查看了这个重载的实现,发现它用于FromAsync处理任务创建和取消:

public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
    return SelectMany_<TSource, TTaskResult, TResult> (
        source,
        x => FromAsync (ct => taskSelector (x, ct)),
        resultSelector
    );
}
Run Code Online (Sandbox Code Playgroud)

我转而查看FromAsync它是如何实现的,并惊喜地发现它也是可组合的:

public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
    return Defer (() => StartAsync (functionAsync));
}
Run Code Online (Sandbox Code Playgroud)

我重复使用了DeferStartAsync,同时还添加Catch了吞咽错误。Defer和的组合Concat可确保任务相互等待并按原始顺序启动。