Dan*_*mov 5 .net c# asynchronous task-parallel-library system.reactive
我有一个类型序列IObservable<T>
和一个映射T, CancellationToken
到a 的函数Task<U>
.什么是最简洁的方式来IObservable<U>
摆脱他们?
我需要以下语义:
这是我看到的签名:
public static IObservable<U> Select<T, U> (
this IObservable<T> source,
Func<T, CancellationToken, Task<U>> selector
);
Run Code Online (Sandbox Code Playgroud)
我还没有写任何代码,但我会除非有人打败我.
在任何情况下,我都不熟悉运营商Window
,所以我的解决方案可能不那么优雅.
我需要C#4中的解决方案,但为了比较,也欢迎C#5答案.
如果你很好奇,下面是我的真实场景,或多或少:
Dropbox.GetImagesRecursively ()
.ObserveOn (SynchronizationContext.Current)
.Select (DownloadImage)
.Subscribe (AddImageToFilePicker);
Run Code Online (Sandbox Code Playgroud)
到目前为止,这似乎对我有用:
public static IObservable<U> Select<T, U> (
this IObservable<T> source,
Func<T, CancellationToken, Task<U>> selector)
{
return source
.Select (item =>
Observable.Defer (() =>
Observable.StartAsync (ct => selector (item, ct))
.Catch (Observable.Empty<U> ())
))
.Concat ();
}
Run Code Online (Sandbox Code Playgroud)
我们将一个基于延迟任务的异常吞咽可观察值映射到每个项目,然后将它们连接起来。
我的思考过程是这样的。
我注意到其中一个SelectMany
重载几乎完全符合我的要求,甚至具有完全相同的签名。但它并没有满足我的需求:
我查看了这个重载的实现,发现它用于FromAsync
处理任务创建和取消:
public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
return SelectMany_<TSource, TTaskResult, TResult> (
source,
x => FromAsync (ct => taskSelector (x, ct)),
resultSelector
);
}
Run Code Online (Sandbox Code Playgroud)
我转而查看FromAsync
它是如何实现的,并惊喜地发现它也是可组合的:
public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
return Defer (() => StartAsync (functionAsync));
}
Run Code Online (Sandbox Code Playgroud)
我重复使用了Defer
和StartAsync
,同时还添加Catch
了吞咽错误。Defer
和的组合Concat
可确保任务相互等待并按原始顺序启动。
归档时间: |
|
查看次数: |
746 次 |
最近记录: |