Rx – 与超时不同?

dnf*_*dnf 1 .net c# timeout distinct system.reactive

我想知道是否有任何方法可以在 .NET 的 Reactive Extensions 中实现 Distinct,使其在给定的时间内工作,并且在此之后它应该重置并允许再次返回值。我需要这个应用程序中的热源,该应用程序将全年工作,现在停止,所以我担心性能,我需要在一段时间后允许这些值。还有 DistinctUntilChanged 但在我的情况下,值可以混合 - 例如:AAXA,DistinctUntilChanged 会给我 AXA,我需要结果 AX,并且在给定时间后应该重置 distinct。

Shl*_*omo 5

接受的答案是有缺陷的;缺陷如下所示。这是解决方案的演示,带有一个测试批次:

TestScheduler ts = new TestScheduler();

var source = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(400.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);

var target = source.TimedDistinct(TimeSpan.FromMilliseconds(300), ts);

var expectedResults = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('B'))
);

var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();

ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
Run Code Online (Sandbox Code Playgroud)

解决方案包括许多用于 的重载TimedDistinct,允许IScheduler注入,以及类似于 的IEqualityComparer<T>注入Distinct。忽略所有这些重载,该解决方案建立在一个helper方法StateWhere,这是一种像的组合ScanWhere:它过滤像Where,但可以让你嵌入状态像Scan

public static class RxState
{
    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime)
    {
        return TimedDistinct(source, expirationTime, Scheduler.Default);    
    }

    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IScheduler scheduler)
    {
        return TimedDistinct<TSource>(source, expirationTime, EqualityComparer<TSource>.Default, scheduler);
    }

    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer)
    {
        return TimedDistinct(source, expirationTime, comparer, Scheduler.Default);
    }

    public static IObservable<TSource> TimedDistinct<TSource>(this IObservable<TSource> source, TimeSpan expirationTime, IEqualityComparer<TSource> comparer, IScheduler scheduler)
    {
        var toReturn = source
            .Timestamp(scheduler)
            .StateWhere(
                new Dictionary<TSource, DateTimeOffset>(comparer),
                (state, item) => item.Value,
                (state, item) => state
                    .Where(kvp => item.Timestamp - kvp.Value < expirationTime)
                    .Concat( 
                        !state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
                            ? Enumerable.Repeat(new KeyValuePair<TSource, DateTimeOffset>(item.Value, item.Timestamp), 1)
                            : Enumerable.Empty<KeyValuePair<TSource, DateTimeOffset>>()
                    )
                    .ToDictionary(kvp => kvp.Key, kvp => kvp.Value, comparer),
                (state, item) => !state.ContainsKey(item.Value) || item.Timestamp - state[item.Value] >= expirationTime
        );
        return toReturn;
    }

    public static IObservable<TResult> StateSelectMany<TSource, TState, TResult>(
            this IObservable<TSource> source,
            TState initialState,
            Func<TState, TSource, IObservable<TResult>> resultSelector,
            Func<TState, TSource, TState> stateSelector
        )
    {
        return source
            .Scan(Tuple.Create(initialState, Observable.Empty<TResult>()), (state, item) => Tuple.Create(stateSelector(state.Item1, item), resultSelector(state.Item1, item)))
            .SelectMany(t => t.Item2);
    }

    public static IObservable<TResult> StateWhere<TSource, TState, TResult>(
            this IObservable<TSource> source,
            TState initialState,
            Func<TState, TSource, TResult> resultSelector,
            Func<TState, TSource, TState> stateSelector,
            Func<TState, TSource, bool> filter
        )
    {
        return source
            .StateSelectMany(initialState, (state, item) =>
                    filter(state, item) ? Observable.Return(resultSelector(state, item)) : Observable.Empty<TResult>(),
                stateSelector);
    }
}
Run Code Online (Sandbox Code Playgroud)

接受的答案有两个缺陷:

  1. 它不接受IScheduler注入,这意味着很难在 Rx 测试框架内进行测试。这很容易修复。
  2. 它依赖于可变状态,这在像 Rx 这样的多线程框架中不能很好地工作。

问题 #2 在多个订阅者中很明显:

var observable = Observable.Range(0, 5)
    .DistinctFor(TimeSpan.MaxValue)
    ;

observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));
Run Code Online (Sandbox Code Playgroud)

遵循常规 Rx 行为的输出应该输出 0-4 两次。相反,0-4 只输出一次。

这是另一个示例缺陷:

var subject = new Subject<int>();
var observable = subject
    .DistinctFor(TimeSpan.MaxValue);

observable.Subscribe(i => Console.WriteLine(i));
observable.Subscribe(i => Console.WriteLine(i));

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
Run Code Online (Sandbox Code Playgroud)

这输出1 2 3一次,而不是两次。


这是代码MsTicks

public static class RxTestingHelpers
{
    public static long MsTicks(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }

}
Run Code Online (Sandbox Code Playgroud)