鉴于:
IObservable<T> src async Task F(T){...}.F只能按顺序调用.所以await F(x);await F(y);很好,但是Task.Factory.ContinueWhenAll(new[]{F(x),F(y)}, _ => {...});错了,因为F(x)并且F(y)不能同时运行.我很清楚这await src.Do(F)是错误的,因为它会F同时运行.
我的问题是如何正确地做到这一点?
Observable.Do仅用于副作用,不用于顺序组合. SelectMany可能是你想要的.从Rx 2.0开始,有一些重载SelectMany可以很容易地组成observable Task<T>.(请注意这些和类似的Task/Observable coop运算符可能引入的额外并发性.)
var q = from value in src
from _ in F(value.X).AsVoidAsync() // See helper definition below
from __ in F(value.Y).AsVoidAsync()
select value;
Run Code Online (Sandbox Code Playgroud)
但是,基于您特别询问Do运算符的事实,我怀疑src可能包含多个值,并且您不希望为每个值重叠调用F. 在这种情况下,考虑到SelectMany实际上就是这样Select->Merge; 因此,你可能想要的是Select->Concat.
// using System.Reactive.Threading.Tasks
var q = src.Select(x => Observable.Defer(() => F(x).ToObservable())).Concat();
Run Code Online (Sandbox Code Playgroud)
不要忘了用Defer,因为F(x)是热的.
AsVoidAsync扩展:
IObservable<T>需要一个T,但Task代表无效,因此Rx的转换运算符要求我们Task<T>从a 得到一个Task.我倾向于使用Rx的System.Reactive.Unit结构T:
public static class TaskExtensions
{
public static Task<Unit> AsVoidAsync(this Task task)
{
return task.ContinueWith(t =>
{
var tcs = new TaskCompletionSource<Unit>();
if (t.IsCanceled)
{
tcs.SetCanceled();
}
else if (t.IsFaulted)
{
tcs.SetException(t.Exception);
}
else
{
tcs.SetResult(Unit.Default);
}
return tcs.Task;
},
TaskContinuationOptions.ExecuteSynchronously)
.Unwrap();
}
}
Run Code Online (Sandbox Code Playgroud)
或者,您可以始终只调用专门的ToObservable方法.
| 归档时间: |
|
| 查看次数: |
573 次 |
| 最近记录: |