无损失的反应(RX)油门

Ian*_*Ian 2 c# system.reactive

在我开始写一个之前,我认为值得一问:RX 有一个油门扩展方法,如果事件发生得太快,它会丢弃事件。

因此,如果您要求它将事件限制为每 5 秒 1 个,如果您在 0.1 秒后收到一个事件,然后在 1 秒后收到第二个事件,您将收到一个事件,然后是静音。

我想要的是它在 0.1 秒后引发第一个事件,但在 4.9 秒后引发另一个事件。

此外,如果我在 0.1、1 和 2 秒收到事件,我希望它在 0.1 秒、5 秒时引发事件,然后什么都不发生,所以我不希望它捕获 n 个事件并且每个周期只释放一个 n 个周期.

Buffer 做相反的事情,因为它将所有内容保存 5 秒,然后引发事件,所以既不是油门也不是缓冲区,而是介于两者之间的东西。

有没有办法用现有的框架来做到这一点,还是我需要写一个?

Lee*_*ell 6

我认为您将不得不编写自己的运算符,或者使用Window. 像其他评论一样,我不是 100% 确定您的要求,但我已尝试在这些测试中捕获它们。

using System;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using NUnit.Framework;

[TestFixture]
public class Throttle : ReactiveTest
{
    private TestScheduler _testScheduler;
    private ITestableObservable<int> _sourceSequence;
    private ITestableObserver<int> _observer;

    [SetUp]
    public void SetUp()
    {
        var windowPeriod = TimeSpan.FromSeconds(5);
        _testScheduler = new TestScheduler();
        _sourceSequence = _testScheduler.CreateColdObservable(
            //Question does the window start when the event starts, or at time 0?
            OnNext(0.1.Seconds(), 1),
            OnNext(1.0.Seconds(), 2),
            OnNext(2.0.Seconds(), 3),
            OnNext(7.0.Seconds(), 4),
            OnCompleted<int>(100.0.Seconds())
            );

        _observer = _testScheduler.CreateObserver<int>();
        _sourceSequence
            .Window(windowPeriod, _testScheduler)
            .SelectMany(window =>
                window.Publish(
                    shared => shared.Take(1).Concat(shared.Skip(1).TakeLast(1))
                )
            )
            .Subscribe(_observer);
        _testScheduler.Start();
    }

    [Test]
    public void Should_eagerly_publish_new_events()
    {
        Assert.AreEqual(OnNext(0.1.Seconds(), 1), _observer.Messages[0]);
    }

    [Test]
    public void Should_publish_last_event_of_a_window()
    {
        //OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
        Assert.AreEqual(OnNext(5.0.Seconds(), 3), _observer.Messages[1]);
    }

    [Test]
    public void Should_only_publish_event_once_if_it_is_the_only_event_for_the_window()
    {
        Assert.AreEqual(OnNext(7.0.Seconds(), 4), _observer.Messages[2]);
        Assert.AreEqual(OnCompleted<int>(100.0.Seconds()), _observer.Messages[3]);
    }

    [Test]
    public void AsOneTest()
    {
        var expected = new[]
        {
            OnNext(0.1.Seconds(), 1),
            //OnNext(1.0.Seconds(), 2) is ignored. As OnNext(5.0.Seconds(), 3) occurs after it, and before the end of a window, it is yeiled.
            OnNext(5.0.Seconds(), 3),
            OnNext(7.0.Seconds(), 4),
            OnCompleted<int>(100.0.Seconds())
        };
        CollectionAssert.AreEqual(expected, _observer.Messages);
    }
}
Run Code Online (Sandbox Code Playgroud)