Dmi*_*try 7 c# reactive-programming system.reactive observable
我一直在寻找关于如何在rx中使用Observable.Buffer的例子,但找不到比锅炉板时间缓冲的东西更重要的东西.
似乎有一个重载指定一个"bufferClosingSelector",但我无法围绕它思考.
我要做的是创建一个按时间或"累积"缓冲的序列.考虑一个请求流,其中每个请求都有一定的权重,我不希望一次处理超过x累计权重,或者如果累积不够,只要给我最后一个时间帧(常规缓冲区功能) )
Ast*_*sti 15
bufferClosingSelector 是一个函数,每次调用一个Observable,当预期缓冲区被关闭时,它会产生一个值.
例如,
source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))像常规Buffer(time)超载一样工作.
在您想要对序列进行加权时,您可以应用Scan序列,然后决定您的聚合条件.
例如,source.Scan((a,c) => a + c).SkipWhile(a => a < 100)给出一个序列,当源序列加起来超过100时,该序列产生一个值.
您可以使用Amb这两个结束条件来查看哪些反应首先:
.Buffer(() => Observable.Amb
(
Observable.Timer(TimeSpan.FromSeconds(1)),
source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
)
)
Run Code Online (Sandbox Code Playgroud)
您可以使用任何一系列组合器,它们可以为缓冲区生成任何值.
注意:
给结束选择器的值无关紧要 - 这是重要的通知.因此,将不同类型的源组合在一起,Amb只需将其更改为System.Reactive.Unit.
Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2110 次 |
| 最近记录: |