Observable.Buffer是否可能在时间以外的东西上

Dmi*_*try 7 c# reactive-programming system.reactive observable

我一直在寻找关于如何在rx中使用Observable.Buffer的例子,但找不到比锅炉板时间缓冲的东西更重要的东西.

似乎有一个重载指定一个"bufferClosingSelector",但我无法围绕它思考.

我要做的是创建一个按时间或"累积"缓冲的序列.考虑一个请求流,其中每个请求都有一定的权重,我不希望一次处理超过x累计权重,或者如果累积不够,只要给我最后一个时间帧(常规缓冲区功能) )

Ast*_*sti 15

bufferClosingSelector 是一个函数,每次调用一个Observable,当预期缓冲区被关闭时,它会产生一个值.

例如,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))像常规Buffer(time)超载一样工作.

在您想要对序列进行加权时,您可以应用Scan序列,然后决定您的聚合条件.

例如,source.Scan((a,c) => a + c).SkipWhile(a => a < 100)给出一个序列,当源序列加起来超过100时,该序列产生一个值.

您可以使用Amb这两个结束条件来查看哪些反应首先:

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )
Run Code Online (Sandbox Code Playgroud)

您可以使用任何一系列组合器,它们可以为缓冲区生成任何值.

注意: 给结束选择器的值无关紧要 - 这是重要的通知.因此,将不同类型的源组合在一起,Amb只需将其更改为System.Reactive.Unit.

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
Run Code Online (Sandbox Code Playgroud)