如何基于Func <T>将IObservable <T>窗口/缓冲到块中

Ala*_*Maw 5 c# system.reactive observable

给出一个类:

class Foo { DateTime Timestamp {get; set;} }
Run Code Online (Sandbox Code Playgroud)

......并且IObservable<Foo>,在保证单调增加 Timestamp s的情况下,如何IObservable<IList<Foo>>根据这些Timestamps 生成一个列入Lists的列表?

即每个人IList<Foo>应该有五秒钟的事件,或者其他什么.我知道我可以使用Buffer具有TimeSpan过载,但我需要花时间从事件本身,而不是挂钟.(除非有一种聪明的方式来提供一个IScheduler使用IObservable自身作为来源的.Now?)

如果我尝试使用这样的Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)重载:

IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
    x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();

pub.Buffer(windows).Subscribe(x => t.Dump()));  // linqpad
pub.Connect();
Run Code Online (Sandbox Code Playgroud)

...然后IList实例包含导致窗口关闭的项目,但我真的希望这个项目进入下一个窗口/缓冲区.

例如,使用时间戳,[0, 1, 10, 11, 15]您将得到块[[0], [1, 10], [11, 15]]而不是[[0, 1], [10, 11], [15]]

Jam*_*rld 6

这是一个想法.组密钥条件是"窗口号",我使用GroupByUntil.这为您提供了示例中所需的输出(我使用了一个类似于该示例的int流 - 但您可以替换您需要的任何内容来编写窗口).

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateHotObservable<int>(
            OnNext(0, 0),
            OnNext(1, 1),
            OnNext(10, 10),
            OnNext(11, 11),
            OnNext(15, 15),
            OnCompleted(16, 0));                  

        xs.Publish(ps =>                                // (1)
            ps.GroupByUntil(
                p => p / 5,                             // (2)
                grp => ps.Where(p => p / 5 != grp.Key)) // (3)
            .SelectMany(x => x.ToList()))               // (4)
        .Subscribe(Console.WriteLine);

        scheduler.Start();
    }
}
Run Code Online (Sandbox Code Playgroud)

笔记

  1. 我们发布源流,因为我们将多次订阅.
  2. 这是一个创建组密钥的功能 - 使用此功能从项目类型生成窗口编号.
  3. 这是组终止条件 - 使用它来检查另一个窗口中项目的源流.请注意,这意味着窗口在其外部的元素到达或源流终止之前不会关闭.如果你考虑一下这很明显 - 你需要的输出需要在窗口结束后考虑下一个元素.请注意,如果您的源与实时有任何关系,您可以将其合并为Observable.Timer+Select输出术语的null/default实例以更早地终止流.
  4. SelectMany将组放入列表并展平流.

如果你包含nuget包rx-testing,这个例子将很好地在LINQPad中运行.新建一个Tests实例并运行该Test()方法.