使用IObservable进行批处理

ani*_*vas 4 c# .net-4.0 system.reactive

我的服务器端向我发送了批量消息.批处理和频率中的消息是任意的.有时我会以1分钟的间隔收到消息,有时候不会收到一小时的消息.有时是1条消息,有时是10.我当前的实现用于Observable.Buffer(TimeSpan.FromSeconds(5))分组消息并将消息发送给用户.

有没有一种方法可以配置Observable,如果两条消息之间有几秒钟的延迟,则将缓冲的消息发送给用户.

我所处的位置是避免每5秒钟不必要的计时器滴答声.打开其他建议以优化批处理.

Jam*_*rld 9

使用bufferClosingSelector工厂方法

decPL建议使用Buffer接受a 的重载bufferClosingSelector- 在新缓冲区打开时调用的工厂函数.它产生一个流,其第一个OnNext()OnCompleted()信号刷新当前缓冲区.decPL代码看起来像这样:

observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
Run Code Online (Sandbox Code Playgroud)

这在解决方案方面取得了相当大的进展,但它存在一些问题:

  • 在活动期间,服务器不会在节流持续时间内始终发布消息时发送消息.这可能导致大量的,不经常发布的列表.
  • 订阅源有多个订阅; 如果感冒,可能会产生意想不到的副作用.该bufferClosingSelector工厂后调用每一个缓冲结束,所以如果源是冷它就会从初始事件进行节流,而不是最新的.

防止无限制的节流

我们需要使用额外的机制来限制缓冲区长度并防止无限制的限制.Buffer有一个重载,允许您指定最大长度,但遗憾的是,您无法将其与结束选择器结合使用.

让我们调用所需的缓冲区长度限制n.回想一下,OnNext关闭选择器的第一个就足以关闭缓冲区了,所以我们需要做的就是Merge使用一个计数流来节流,该计数流OnNext在来自源的n个事件之后发送.我们可以用.Take(n).LastAsync()它来做; 采取前n个事件但忽略除最后一个之外的所有事件.这是Rx中非常有用的模式.

使源"热"

为了解决bufferClosingSelector工厂重新订阅源的问题,我们需要使用.Publish().RefCount()源上的通用模式为我们提供仅向订阅者发送最新事件的流.这也是一个非常有用的模式.

这是重新设计的代码,其中节流持续时间与计数器合并:

var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;

// single subscription to source
var sourcePub = source.Publish().RefCount();

var output = sourcePub.Buffer(
    () => sourcePub.Throttle(throttleDuration) 
                   .Merge(sourcePub.Take(bufferSize).LastAsync()));
Run Code Online (Sandbox Code Playgroud)

生产准备代码和测试

这是一个带有测试的生产就绪实现(使用nuget包rx-testing&nunit).请注意调度程序的参数化以支持测试.

public static partial class ObservableExtensions
{
    public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
        this IObservable<TSource> source,
        TimeSpan maxInterval,
        int maxBufferSize,
        IScheduler scheduler)
    {
        if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
        if (maxBufferSize <= 0)
            throw new ArgumentOutOfRangeException(
                "maxBufferSize", "maxBufferSize must be positive");

        var publishedSource = source.Publish().RefCount();

        return publishedSource.Buffer(
            () => publishedSource
                .Throttle(maxInterval, scheduler)
                .Merge(publishedSource.Take(maxBufferSize).LastAsync()));
    }
}

public class BufferNearEventsTests : ReactiveTest
{
    [Test]
    public void CloseEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int> expectedBuffer = new [] {1, 2, 3};
        var expectedTime = maxInterval.Ticks + 300;

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
    }

    [Test]
    public void FarEventsAreUnbuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 1000;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(1000, 1),
            OnNext(2000, 2),
            OnNext(3000, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1},
            new[] {2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            maxInterval.Ticks + 1000,
            maxInterval.Ticks + 2000,
            maxInterval.Ticks + 3000
        };  

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
            OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
    }

    [Test]
    public void UpToMaxEventsAreBuffered()
    {
        TimeSpan maxInterval = TimeSpan.FromTicks(200);
        const int maxBufferSize = 2;

        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));

        IList<int>[] expectedBuffers =
        {
            new[] {1,2},
            new[] {3}
        };

        var expectedTimes = new[]
        {
            200, /* Buffer cap reached */
            maxInterval.Ticks + 300
        };

        var results = scheduler.CreateObserver<IList<int>>();

        source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
              .Subscribe(results);

        scheduler.AdvanceTo(10000);

        results.Messages.AssertEqual(
            OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
            OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
    }

    private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
    {
        CollectionAssert.AreEquivalent(expected, actual);
        return true;
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 我还改进了解释,以更清楚地涵盖这一点. (2认同)