'等待'是一个可观察的

Ben*_*jol 10 c# task-parallel-library system.reactive

我正处于这样一种情况:我正在处理一系列任务(启用驱动器,更改位置,等待停止,禁用).

'wait for'监视一个IObservable<Status>,我想等待(所以我可以通过它ContinueWith和其他任务).

我开始了下列任务的用户的OnNext处理,但那只是丑陋.我现在想出的是这种扩展方法:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    var tcs = new TaskCompletionSource<T>();
    source
        .Where(pred)
        .DistinctUntilChanged()
        .Take(1)  //OnCompletes the observable, subscription will self-dispose
        .Subscribe(val => tcs.TrySetResult(val),
                    ex => tcs.TrySetException(ex),
                    () => tcs.TrySetCanceled());

    return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)

(更新了svick的处理建议OnCompletedOnError)

问题:

  • 这是好事,坏事还是丑陋?
  • 我是否错过了可以做到这一点的现有扩展?
  • WhereDistinctUntilChanged正确的顺序?(我认为他们是)

Eni*_*ity 11

至少我会将此扩展方法更改为:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    return
        source
            .Where(pred)
            .DistinctUntilChanged()
            .Take(1)
            .ToTask();
}
Run Code Online (Sandbox Code Playgroud)

使用.ToTask()比引入要好得多TaskCompletionSource.您需要对System.Reactive.Threading.Tasks命名空间的引用才能获得.ToTask()扩展方法.

此外,DistinctUntilChanged在此代码中是多余的.您只获得一个值,因此默认情况下它必须是不同的.

现在,我的下一个建议可能有点争议.这个扩展是一个坏主意,因为它隐藏了正在发生的事情的真实语义.

如果我有这两个代码的snippits:

var t = xs.WaitFor(x => x > 10);
Run Code Online (Sandbox Code Playgroud)

要么:

var t = xs.Where(x => x > 10).Take(1).ToTask();
Run Code Online (Sandbox Code Playgroud)

我通常更喜欢第二个snippit,因为它清楚地告诉我发生了什么 - 我不需要记住它的语义WaitFor.

除非你的名字WaitFor更具描述性 - 也许TakeOneAsTaskWhere- 然后你就明白了在使用它的代码中使用运算符并使代码更难管理.

以下是否更容易记住语义?

var t = xs.TakeOneAsTaskWhere(x => x > 10);
Run Code Online (Sandbox Code Playgroud)

对我来说,底线是Rx运算符是组合而不是封装,但是如果你要封装它们,那么它们的含义必须清楚.

我希望这有帮助.