当数据流比订阅者可以消耗的速度快时,Rx如何​​表现?

use*_*350 10 c# system.reactive

我很高兴在生产应用中使用Rx; 我将收听来自不同频道的传入通知更新.

我将在此流的顶部编写Rx查询,我将使用.Window()运算符进行限制.订阅者(在我的例子中是ActionBlock)将以阻塞方式处理此数据; (即它不会从ActionBlock中生成任务).请记住,如果数据的速度比我的订阅者可以消耗的速度快得多,那么传入数据会发生什么.Rx查询是否在内部使用任何缓冲区; 它会溢出吗?

cwh*_*ris 9

您所指的现象称为Back Pressure,Rx团队目前正在探索处理这种情况的不同方法.一种解决方案可能是将背压传回Observable,以便它可以"减速".

为了减轻背压,您可以使用有损运算符,如Throttle或Sample.

提摩太的答案基本上是正确的,但它可能有背压发生在单个线程.如果使用异步代码,则会发生这种情况.从这个意义上说,背压与同步和调度有关,而不是线程(回想一下,默认情况下Rx是单线程的).

如果遇到生成事件的速度超过消耗速度的情况,并且您没有使用有损运算符来缓解背压,那么这些项目通常会被调度/排队/缓冲,这可能导致很多内存分配.

就个人而言,这对我来说不是一个问题,因为通常事件的处理速度比它们产生的要快,或者事件的丢失根本不是一种选择,因此额外的内存消耗是不可避免的.


Jam*_*rld 5

这实际上取决于各个运营商的实现,但内置的运营商将基于每个订阅缓冲- 因此慢速消费者不会阻止其他订阅者,除非用户共享线程.

在相关的说明中,Rx并不总是保护Rx语法; 例如,您有责任确保不在主题上对OnNext进行并发调用.你可以Observable.Synchronize()用来解决这个问题.