Ben*_*jol 10 c# task-parallel-library system.reactive
我正处于这样一种情况:我正在处理一系列任务(启用驱动器,更改位置,等待停止,禁用).
'wait for'监视一个IObservable<Status>
,我想等待(所以我可以通过它ContinueWith
和其他任务).
我开始了下列任务内的用户的OnNext处理,但那只是丑陋.我现在想出的是这种扩展方法:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
var tcs = new TaskCompletionSource<T>();
source
.Where(pred)
.DistinctUntilChanged()
.Take(1) //OnCompletes the observable, subscription will self-dispose
.Subscribe(val => tcs.TrySetResult(val),
ex => tcs.TrySetException(ex),
() => tcs.TrySetCanceled());
return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)
(更新了svick的处理建议OnCompleted
和OnError
)
Where
和DistinctUntilChanged
正确的顺序?(我认为他们是)Eni*_*ity 11
至少我会将此扩展方法更改为:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
return
source
.Where(pred)
.DistinctUntilChanged()
.Take(1)
.ToTask();
}
Run Code Online (Sandbox Code Playgroud)
使用.ToTask()
比引入要好得多TaskCompletionSource
.您需要对System.Reactive.Threading.Tasks
命名空间的引用才能获得.ToTask()
扩展方法.
此外,DistinctUntilChanged
在此代码中是多余的.您只获得一个值,因此默认情况下它必须是不同的.
现在,我的下一个建议可能有点争议.这个扩展是一个坏主意,因为它隐藏了正在发生的事情的真实语义.
如果我有这两个代码的snippits:
var t = xs.WaitFor(x => x > 10);
Run Code Online (Sandbox Code Playgroud)
要么:
var t = xs.Where(x => x > 10).Take(1).ToTask();
Run Code Online (Sandbox Code Playgroud)
我通常更喜欢第二个snippit,因为它清楚地告诉我发生了什么 - 我不需要记住它的语义WaitFor
.
除非你的名字WaitFor
更具描述性 - 也许TakeOneAsTaskWhere
- 然后你就明白了在使用它的代码中使用运算符并使代码更难管理.
以下是否更容易记住语义?
var t = xs.TakeOneAsTaskWhere(x => x > 10);
Run Code Online (Sandbox Code Playgroud)
对我来说,底线是Rx运算符是组合而不是封装,但是如果你要封装它们,那么它们的含义必须清楚.
我希望这有帮助.