在没有onBackpressureLatest的情况下处理Rx.NET中的背压

Ste*_*eve 9 c# f# reactive-programming system.reactive

我需要在Rx.NET中实现以下算法:

  1. stream如果没有新商品,请从中获取最新商品,或等待新商品不加阻止.只有最新项目很重要,其他项目可以删除.
  2. 输入项目SlowFunction并打印输出.
  3. 从步骤1开始重复.

天真的解决方案是:

let PrintLatestData (stream: IObservable<_>) =
    stream.Select(SlowFunction).Subscribe(printfn "%A")
Run Code Online (Sandbox Code Playgroud)

但是,此解决方案不起作用,因为平均发布stream项目的速度快于SlowFunction消耗它们的速度.由于Select不会丢弃项目,而是尝试按从最旧到最新的顺序处理每个项目,因此在程序运行时,正在发出和打印的项目之间的延迟将增加到无穷大.只应从流中获取最新的最新项目,以避免这种无限增长的背压.

我搜索了文档,发现了一个onBackpressureLatest在RxJava中调用的方法,根据我的理解,我会按照上面的描述进行操作.但是,该方法在Rx.NET中不存在.如何在Rx.NET中实现这一点?

Lee*_*ell 9

我想你想用类似的东西ObserveLatestOn.它有效地用单个值和一个标志替换传入事件的队列.

詹姆斯世界在http://www.zerobugbuild.com/?p=192上发表了关于它的博客

该概念在GUI应用程序中被大量使用,这些应用程序无法信任服务器在其上推送数据的速度.

您还可以在Reactive Trader中看到一个实现https://github.com/AdaptiveConsulting/ReactiveTrader/blob/83a6b7f312b9ba9d70327f03d8d326934b379211/src/Adaptive.ReactiveTrader.Shared/Extensions/ObservableExtensions.cs#L64 以及解释ReactiveTrader的支持演示文稿https:// leecampbell.com/presentations/#ReactConfLondon2014

要清楚这是一个减载算法,而不是背压算法.