我正在尝试将observable分隔成窗口(或者为了我的目的,Buffers也很好),同时能够关闭自定义位置的窗口/缓冲区.
例如,我有一个observable,它从1开始生成整数并向上移动.我想在每个可被7整除的数字处关闭一个窗口.在这种情况下,我的结束函数需要将该项作为参数.
方法有一个重载Window:
Window<TSource, TWindowClosing>(IObservable<TSource>, Func<IObservable<TWindowClosing>>)
Run Code Online (Sandbox Code Playgroud)
要么不能使用这个重载,要么我无法绕过它.文档描述它完全符合我的要求,但没有显示示例.此外,它还显示了非确定性结束的示例,它取决于关闭可观察集合发出项目时的时间.
Window运算符将可观察序列分解为连续的非重叠窗口.当前窗口的结束和下一个窗口的开始由可观察序列控制,该序列是windowClosingSelect函数的结果,该函数作为输入参数传递给操作符.运算符可用于将一组事件分组到窗口中.例如,交易的状态可以是被观察的主要序列.这些州可能包括:准备,准备,活动和承诺/中止.主序列可以包括它们按顺序出现的所有状态.windowClosingSelect函数可以返回一个可观察的序列,该序列仅在Committed或Abort状态下生成一个值.这将关闭表示特定事务的事务事件的窗口.
我认为像下面这样的人会做这个工作,但我必须自己实施:
Window<TSource, TWindowClosing>(IObservable<TSource>, Func<TSource, bool>)
Run Code Online (Sandbox Code Playgroud)
使用带有Where子句的原始序列作为结束序列.如果您的源序列很冷,那么请使用Publish并RefCount使其正常工作.
var source = ...;
var sharedSource = source.Publish().RefCount();
var closingSignal = sharedSource.Where(i => (i % 7) == 0);
var windows = sharedSource.Window(() => closingSignal);
Run Code Online (Sandbox Code Playgroud)