根据值限制IObservable

Che*_*tah 6 c# system.reactive

我有一个IObservable<String>.

我试图检测(和处理)短时间连续通知相同字符串的情况.

我想要一个filter/stream/observable,如果相同的字符串在彼此相差250ms内得到通知,它只会通知一次.

不确定从哪里开始.

Jam*_*rld 11

这是一个相当紧凑的解决方案.你的帖子有点暧昧关于持续时间是否会在一个明确的值到来时立即重置 - 所以我为这两种解释提供了两种解决方案.

变化1 - 不同的"介于"值之间不会重置计时器

这是当你严格关注抑制的持续时间而不关心是否有任何"中间"值(根据McGarnagle的解决方案) - 即如果你得到"a", "b" ,"a"快速,你仍然想要抑制第二个"a".幸运的是,这GroupByUntil对于持续时间的哪些组来说非常容易,并且会发出每个组的第一个元素:

    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.GroupByUntil(k => k,
                                   _ => Observable.Timer(duration, scheduler))
                     .SelectMany(y => y.FirstAsync());
    }
Run Code Online (Sandbox Code Playgroud)

如果你想知道方法名称 - 我首先想出了Variation 2b; 我把名字留在上面,因此单元测试仍然通过.它可能需要一个更好的名称,如SuppressDuplicatesWithinWindow或类似...

变化2a-"在两者之间"的不同值DO重置计时器

这稍微复杂一些 - 现在不同组中的任何事件都将结束给定的组.我使用Publish().RefCount()组合来防止对源的多个订阅,并且必须非常小心null:

public static IObservable<T> DistinctUntilChanged<T>(
    this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
    if (scheduler == null) scheduler = Scheduler.Default;

    var sourcePub = source.Publish().RefCount();

    return sourcePub.GroupByUntil(
        k => k,
        x => Observable.Timer(duration, scheduler)
                       .TakeUntil(
                           sourcePub.Where(i => ReferenceEquals(null, i)
                                                ? !ReferenceEquals(null, x.Key)
                                                : !i.Equals(x.Key))))
        .SelectMany(y => y.FirstAsync());
}
Run Code Online (Sandbox Code Playgroud)

变化2b

这是我尝试的原始方法,我已经添加了它,因为它现在并不是那么糟糕,因为我对2a的改进使它更复杂:

它是一种Observable.DistinctUntilChanged接受持续时间的变体.给定事件,该持续时间内的连续重复事件被抑制.如果到达不同的事件,或者事件在该持续时间之外到达,则发出该事件并重置抑制计时器.

它通过使用DistinctUntilChanged接受IEqualityComparer 的重载来工作.如果值匹配且时间戳在指定的持续时间内,则比较器会将应用了TimeStamp的事件视为相等.

public static partial class ObservableExtensions
{
    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.Timestamp(scheduler)
                     .DistinctUntilChanged(new Comparer<T>(duration))
                     .Select(ts => ts.Value);
    }

    private class Comparer<T> : IEqualityComparer<Timestamped<T>>
    {
        private readonly TimeSpan _duration;

        public Comparer(TimeSpan duration)
        {
            _duration = duration;
        }

        public bool Equals(Timestamped<T> x, Timestamped<T> y)
        {
            if (y.Timestamp - x.Timestamp > _duration) return false;

            return ReferenceEquals(x.Value, y.Value)
                   && !ReferenceEquals(null,x.Value)
                   && x.Value.Equals(y.Value);
        }

        public int GetHashCode(Timestamped<T> obj)
        {
            if (ReferenceEquals(null,obj.Value)) return obj.Timestamp.GetHashCode();
            return obj.Value.GetHashCode() ^ obj.Timestamp.GetHashCode();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

以下是我使用的单元测试(包括nuget包rx-testing和nunit):

public class TestDistinct : ReactiveTest
{
    [Test]
    public void DuplicateWithinDurationIsSupressed()
    {
        var scheduler = new TestScheduler();
        var source =scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void NonDuplicationWithinDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100,"a"),
            OnNext(200,"b"));
    }

    [Test]
    public void DuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "a"));
    }

    [Test]
    public void NonDuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "b"));
    }

    [Test]
    public void TestWithSeveralValues()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));
    }

    [Test]
    public void CanHandleNulls()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null),
            OnNext(700, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null));
    }

    [Test]
    public void TwoDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(150, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void TwoNullDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, (string)null),
            OnNext(150, (string)null),
            OnNext(200, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, (string)null));
    }
}
Run Code Online (Sandbox Code Playgroud)

最后为了完整性 - 变体1将传递TestWithSeveralValues测试的以下变体:

    [Test]
    public void TestWithSeveralValuesVariation1()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(900, "a"));
    }
Run Code Online (Sandbox Code Playgroud)

而null测试将改为最终:

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(700, (string)null)); /* This line changes */
Run Code Online (Sandbox Code Playgroud)

  • 这非常棒.我一直在寻找一个解决类似问题的优雅解决方案(最好通过"变体2"解决).如果我能再次投票,我愿意. (2认同)

McG*_*gle 1

您正在寻找Observable.Throttle

忽略可观察序列中的值,这些值在指定源和 dueTime 的到期时间之前后跟另一个值。

编辑

好的,所以上面的方法只能限制序列中的所有元素,而不是按照 OP 的键。我认为这将是一个简单的下一步,但也许不是那么容易?(F# 有一个split很有帮助的函数,但显然没有 C# 的等效函数。)

因此,这是实施的尝试Split

public static class Extension
{
    public static IDisposable SplitSubscribe<T, TKey>(
        this IObservable<T> source, 
        Func<T, TKey> keySelector, 
        Action<IObservable<TKey>> subscribe)
    {
        // maintain a list of Observables, one for each key (TKey)
        var observables = new ConcurrentDictionary<TKey, Subject<TKey>>();

        // function to create a new Subject
        Func<TKey, Subject<TKey>> createSubject = key =>
        {
            Console.WriteLine("Added for " + key);
            var retval = new Subject<TKey>();
            subscribe(retval);
            retval.OnNext(key);
            return retval;
        };

        // function to update an existing Subject
        Func<TKey, Subject<TKey>, Subject<TKey>> updateSubject = (key, existing) =>
        {
            Console.WriteLine("Updated for " + key);
            existing.OnNext(key);
            return existing;
        };

        return source.Subscribe(next =>
        {
            var key = keySelector(next);
            observables.AddOrUpdate(key, createSubject, updateSubject);
        });
        // TODO dispose of all subscribers
    }

    // special case: key selector is just the item pass-through
    public static IDisposable SplitSubscribe<T>(
        this IObservable<T> source, 
        Action<IObservable<T>> subscribe)
    {
        return source.SplitSubscribe(item => item, subscribe);
    }
}
Run Code Online (Sandbox Code Playgroud)

使用此函数,您可以拆分一个可观察源,然后对每个源进行限制。用法是这样的:

IObservable<string> dummyObservable = new string[] { "a", "b", "a", "b", "b", "c", "a" }.ToObservable();

dummyObservable.SplitSubscribe(next => 
    next.Throttle(TimeSpan.FromMilliseconds(250)).Subscribe(Console.WriteLine));
Run Code Online (Sandbox Code Playgroud)

输出(不保持原始顺序)

添加了一个
为 b 添加
更新为
更新为 b
更新为 b
为 c 添加
更新为
A
C
乙