如何基于时间和计数进行缓冲,但如果没有事件发生则停止计时器

Lar*_*sen 9 system.reactive

我每秒树生成50个项目的序列.然后我想在最多20个项目中批量处理它们,但在释放缓冲区之前也不要等待超过一秒钟.

这很棒!

但是由于间隔永远不会消失,Buffer会持续发射空批量块...

我怎么能避免这种情况?舒尔Where(buf => buf.Count > 0)应该有所帮助 - 但这似乎是一个黑客.

Observable
    .Interval(TimeSpan.FromSeconds(3))
    .Select(n => Observable.Repeat(n, 50))
    .Merge()
    .Buffer(TimeSpan.FromSeconds(1), 20)
    .Subscribe(e => Console.WriteLine(e.Count));
Run Code Online (Sandbox Code Playgroud)

输出:

0-0-0-20-20-10-0-20-20-10-0-0-20-20
Run Code Online (Sandbox Code Playgroud)

Jam*_*rld 7

Where你提出的过滤器是一种合理的方法,我会选择它.

你可以包裹住Buffer,并Where为命名,使意图更加清晰也许一个helper方法,但放心的Where条款地道的RX在这种情况下.

这样想吧; 空缓冲区正在中继最后一秒没有发生事件的信息.虽然您可以认为这是隐含的,但如果Buffer没有发出空列表,则需要额外的工作来检测它.它只是发生了它不是你感兴趣的信息 - 所以这Where是过滤这些信息的适当方法.

懒惰的计时器解决方案

从你的评论("...计时器......懒洋洋地启动...")开始,你可以这样做来创建一个懒惰的计时器并省略零计数:

var source = Observable.Interval(TimeSpan.FromSeconds(3))
                    .Select(n => Observable.Repeat(n, 50))
                    .Merge();

var xs = source.Publish(pub =>
    pub.Buffer(() => pub.Take(1).Delay(TimeSpan.FromSeconds(1))
                        .Merge(pub.Skip(19)).Take(1)));

xs.Subscribe(x => Console.WriteLine(x.Count));
Run Code Online (Sandbox Code Playgroud)

说明

出版

此查询需要多次订阅源事件.为了避免意外的副作用,我们使用Publishpub一个流,它组播source创建只有一个订阅它.这取代了Publish().RefCount()实现相同目标的旧技术,有效地为我们提供了源代码流的"热门"版本.

在这种情况下,这是必要的,以确保在第一个将从当前事件开始后产生的后续缓冲关闭流 - 如果源是冷的,它们将每次重新开始.我在这里写了一些关于发布的文章.

主要查询

我们使用一个重载Buffer接受一个工厂函数,该函数为每个发出的缓冲区调用,以获得一个可观察的流,其第一个事件是终止当前缓冲区的信号.

在这种情况下,当缓冲区的第一个事件已经存在一整秒,或者当源中出现20个事件时,我们希望终止缓冲区- 以先到者为准.

为了达到这个目的,我们Merge描述了每种情况的流 - Take(1).Delay(...)组合描述了第一个条件,而Skip(19).Take(1)描述了第二个条件.

但是,我仍然会以简单的方式测试性能,因为我仍然怀疑这是过度的,但很大程度上取决于平台和场景的精确细节等.