RX Observable.TakeWhile在每个元素之前检查条件,但我需要在之后执行检查

NoP*_*God 8 c# system.reactive

Observable.TakeWhile允许你运行一个序列,只要条件为真(使用委托,所以我们可以对实际的序列对象执行计算),但是它会在每个元素之前检查这个条件.如何在每个元素之后执行相同的检查?

以下代码演示了此问题

    void RunIt()
    {
        List<SomeCommand> listOfCommands = new List<SomeCommand>();
        listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });

        var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);

        obs.Subscribe(x =>
        {
            Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
        });
    }

    class SomeCommand
    {
        public int CurrentIndex;
        public int TotalCount;
    }
Run Code Online (Sandbox Code Playgroud)

这输出

1 of 3
2 of 3
Run Code Online (Sandbox Code Playgroud)

我无法得到第三个元素

看看这个例子,你可能认为我所要做的就是改变我的状况 -

var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);
Run Code Online (Sandbox Code Playgroud)

但是,observable永远不会完成(因为在我的真实世界代码中,流不会在这三个命令之后结束)

Ric*_*lay 15

没有内置的运算符来执行您所要求的操作,但是这里有一个Publish用于运行两个查询而只订阅底层的observable一次:

// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
    this IObservable<T> source, Func<T, bool> predicate)
{
    return source.Publish(co => co.TakeWhile(predicate)
        .Merge(co.SkipWhile(predicate).Take(1)));
}
Run Code Online (Sandbox Code Playgroud)

然后:

var obs = listOfCommands.ToObservable()
    .TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
Run Code Online (Sandbox Code Playgroud)

  • 我认为当这被称为TakeUntil而不是TakeWhileInclusive时,用法可以更好地阅读.例如,Status.TakeUntil(s => s == Status.Completed)读取优于Status.TakeWhileInclusive(s => s!= Status.Completed). (2认同)

NoP*_*God 5

最终编辑:

我在此线程中基于Sergey的TakeWhileInclusive实现实现了解决方案- 如何根据事件中的条件完成Rx Observable

public static IObservable<TSource> TakeUntil<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
        {
            o.OnNext(x);
            if (predicate(x))
                o.OnCompleted();
        },
        o.OnError,
        o.OnCompleted
    ));
}
Run Code Online (Sandbox Code Playgroud)