Ala*_*Maw 5 c# system.reactive observable
给出一个类:
class Foo { DateTime Timestamp {get; set;} }
Run Code Online (Sandbox Code Playgroud)
......并且IObservable<Foo>
,在保证单调增加 Timestamp
s的情况下,如何IObservable<IList<Foo>>
根据这些Timestamp
s 生成一个列入Lists的列表?
即每个人IList<Foo>
应该有五秒钟的事件,或者其他什么.我知道我可以使用Buffer
具有TimeSpan
过载,但我需要花时间从事件本身,而不是挂钟.(除非有一种聪明的方式来提供一个IScheduler
使用IObservable
自身作为来源的.Now
?)
如果我尝试使用这样的Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)
重载:
IObservable<Foo> foos = //...;
var pub = foos.Publish();
var windows = pub.Select(x => new DateTime(
x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged();
pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad
pub.Connect();
Run Code Online (Sandbox Code Playgroud)
...然后IList
实例包含导致窗口关闭的项目,但我真的希望这个项目进入下一个窗口/缓冲区.
例如,使用时间戳,[0, 1, 10, 11, 15]
您将得到块[[0], [1, 10], [11, 15]]
而不是[[0, 1], [10, 11], [15]]
这是一个想法.组密钥条件是"窗口号",我使用GroupByUntil
.这为您提供了示例中所需的输出(我使用了一个类似于该示例的int流 - 但您可以替换您需要的任何内容来编写窗口).
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable<int>(
OnNext(0, 0),
OnNext(1, 1),
OnNext(10, 10),
OnNext(11, 11),
OnNext(15, 15),
OnCompleted(16, 0));
xs.Publish(ps => // (1)
ps.GroupByUntil(
p => p / 5, // (2)
grp => ps.Where(p => p / 5 != grp.Key)) // (3)
.SelectMany(x => x.ToList())) // (4)
.Subscribe(Console.WriteLine);
scheduler.Start();
}
}
Run Code Online (Sandbox Code Playgroud)
Observable.Timer+Select
输出术语的null/default实例以更早地终止流.如果你包含nuget包rx-testing,这个例子将很好地在LINQPad中运行.新建一个Tests实例并运行该Test()
方法.