我每秒树生成50个项目的序列.然后我想在最多20个项目中批量处理它们,但在释放缓冲区之前也不要等待超过一秒钟.
这很棒!
但是由于间隔永远不会消失,Buffer会持续发射空批量块...
我怎么能避免这种情况?舒尔Where(buf => buf.Count > 0)应该有所帮助 - 但这似乎是一个黑客.
Observable
.Interval(TimeSpan.FromSeconds(3))
.Select(n => Observable.Repeat(n, 50))
.Merge()
.Buffer(TimeSpan.FromSeconds(1), 20)
.Subscribe(e => Console.WriteLine(e.Count));
Run Code Online (Sandbox Code Playgroud)
输出:
0-0-0-20-20-10-0-20-20-10-0-0-20-20
Run Code Online (Sandbox Code Playgroud)
Where你提出的过滤器是一种合理的方法,我会选择它.
你可以包裹住Buffer,并Where为命名,使意图更加清晰也许一个helper方法,但放心的Where条款是地道的RX在这种情况下.
这样想吧; 空缓冲区正在中继最后一秒没有发生事件的信息.虽然您可以认为这是隐含的,但如果Buffer没有发出空列表,则需要额外的工作来检测它.它只是发生了它不是你感兴趣的信息 - 所以这Where是过滤这些信息的适当方法.
懒惰的计时器解决方案
从你的评论("...计时器......懒洋洋地启动...")开始,你可以这样做来创建一个懒惰的计时器并省略零计数:
var source = Observable.Interval(TimeSpan.FromSeconds(3))
.Select(n => Observable.Repeat(n, 50))
.Merge();
var xs = source.Publish(pub =>
pub.Buffer(() => pub.Take(1).Delay(TimeSpan.FromSeconds(1))
.Merge(pub.Skip(19)).Take(1)));
xs.Subscribe(x => Console.WriteLine(x.Count));
Run Code Online (Sandbox Code Playgroud)
说明
出版
此查询需要多次订阅源事件.为了避免意外的副作用,我们使用Publish了pub一个流,它组播source创建只有一个订阅它.这取代了Publish().RefCount()实现相同目标的旧技术,有效地为我们提供了源代码流的"热门"版本.
在这种情况下,这是必要的,以确保在第一个将从当前事件开始后产生的后续缓冲关闭流 - 如果源是冷的,它们将每次重新开始.我在这里写了一些关于发布的文章.
主要查询
我们使用一个重载Buffer接受一个工厂函数,该函数为每个发出的缓冲区调用,以获得一个可观察的流,其第一个事件是终止当前缓冲区的信号.
在这种情况下,当缓冲区中的第一个事件已经存在一整秒,或者当源中出现20个事件时,我们希望终止缓冲区- 以先到者为准.
为了达到这个目的,我们Merge描述了每种情况的流 - Take(1).Delay(...)组合描述了第一个条件,而Skip(19).Take(1)描述了第二个条件.
但是,我仍然会以简单的方式测试性能,因为我仍然怀疑这是过度的,但很大程度上取决于平台和场景的精确细节等.
| 归档时间: |
|
| 查看次数: |
3199 次 |
| 最近记录: |