Ste*_*eve 9 c# f# reactive-programming system.reactive
我需要在Rx.NET中实现以下算法:
stream
如果没有新商品,请从中获取最新商品,或等待新商品不加阻止.只有最新项目很重要,其他项目可以删除.SlowFunction
并打印输出.天真的解决方案是:
let PrintLatestData (stream: IObservable<_>) =
stream.Select(SlowFunction).Subscribe(printfn "%A")
Run Code Online (Sandbox Code Playgroud)
但是,此解决方案不起作用,因为平均发布stream
项目的速度快于SlowFunction
消耗它们的速度.由于Select
不会丢弃项目,而是尝试按从最旧到最新的顺序处理每个项目,因此在程序运行时,正在发出和打印的项目之间的延迟将增加到无穷大.只应从流中获取最新的最新项目,以避免这种无限增长的背压.
我搜索了文档,发现了一个onBackpressureLatest
在RxJava中调用的方法,根据我的理解,我会按照上面的描述进行操作.但是,该方法在Rx.NET中不存在.如何在Rx.NET中实现这一点?
我想你想用类似的东西ObserveLatestOn
.它有效地用单个值和一个标志替换传入事件的队列.
詹姆斯世界在http://www.zerobugbuild.com/?p=192上发表了关于它的博客
该概念在GUI应用程序中被大量使用,这些应用程序无法信任服务器在其上推送数据的速度.
您还可以在Reactive Trader中看到一个实现https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 以及解释ReactiveTrader的支持演示文稿https:// leecampbell.com/presentations/#ReactConfLondon2014
要清楚这是一个减载算法,而不是背压算法.
归档时间: |
|
查看次数: |
1755 次 |
最近记录: |