带有参数关闭功能的Rx Observable Window

Nik*_*vić 3 system.reactive

我正在尝试将observable分隔成窗口(或者为了我的目的,Buffers也很好),同时能够关闭自定义位置的窗口/缓冲区.

例如,我有一个observable,它从1开始生成整数并向上移动.我想在每个可被7整除的数字处关闭一个窗口.在这种情况下,我的结束函数需要将该项作为参数.

方法有一个重载Window:

Window<TSource, TWindowClosing>(IObservable<TSource>, Func<IObservable<TWindowClosing>>)
Run Code Online (Sandbox Code Playgroud)

要么不能使用这个重载,要么我无法绕过它.文档描述它完全符合我的要求,但没有显示示例.此外,它还显示了非确定性结束的示例,它取决于关闭可观察集合发出项目时的时间.

Window运算符将可观察序列分解为连续的非重叠窗口.当前窗口的结束和下一个窗口的开始由可观察序列控制,该序列是windowClosingSelect函数的结果,该函数作为输入参数传递给操作符.运算符可用于将一组事件分组到窗口中.例如,交易的状态可以是被观察的主要序列.这些州可能包括:准备,准备,活动和承诺/中止.主序列可以包括它们按顺序出现的所有状态.windowClosingSelect函数可以返回一个可观察的序列,该序列仅在Committed或Abort状态下生成一个值.这将关闭表示特定事务的事务事件的窗口.

我认为像下面这样的人会做这个工作,但我必须自己实施:

Window<TSource, TWindowClosing>(IObservable<TSource>, Func<TSource, bool>)
Run Code Online (Sandbox Code Playgroud)
  • 内置函数是否可以实现这种窗口化(我知道我可以自己构建一个)?
  • 一旦从窗口可观察的项目中发出一个项目,是否可以基于发出的项目关闭窗口或仅非确定性地关闭窗口?

Bra*_*don 8

使用带有Where子句的原始序列作为结束序列.如果您的源序列很冷,那么请使用PublishRefCount使其正常工作.

var source = ...;
var sharedSource = source.Publish().RefCount();
var closingSignal = sharedSource.Where(i => (i % 7) == 0);
var windows = sharedSource.Window(() => closingSignal);
Run Code Online (Sandbox Code Playgroud)