如何使用Rx(Reactive Extensions)粘贴Observable序列中的相对延迟

Ant*_*nio 5 .net delay reactive-programming system.reactive

我正在使用Reactive扩展(版本2.1,以防万一)开始开发,对于我的示例应用程序,我需要按一些间隔推送一系列int值,即每1秒.

我知道,我可以创建一个序列,Observable.Range<int>(0,10)但我无法弄清楚如何设置推送之间的相对时间.我已经尝试过Delay()但只在开始时只移动一次序列.

然后我找到Observable.Generate()了可以通过下一步方式调整到此任务的方法:

var delayed = Observable.
              Generate(0, i => i <= 10, i => i + 1, i => i,
                          i => TimeSpan.FromSeconds(1));
Run Code Online (Sandbox Code Playgroud)

但这似乎只适用于简单的'for-each-like'定义序列.所以,一般来说,我的问题是,我们是否可以获取任何源序列并用一些代理来包装它,这些代理将从源中提取消息并进一步推迟它?

S--d1--d2--d3--d4--d5-|
D--d1-delay-d2-delay-d3-delay-d4-delay-d5-|
Run Code Online (Sandbox Code Playgroud)

PS如果这种方法与ReactiveExtensions的概念相矛盾,请同时注意这一点.我不想"通过各种手段"这样做,并且他们将来会遇到一些其他的设计问题.

PPS一般理念是确保输出序列在事件之间具有指定的间隔,尽管输入序列是有限的还是无限的以及它推动事件的频率.

Jam*_*rld 8

Observable.Interval是你想要看的.它将生成一个基于0的长值,在您指定的每个时间间隔内递增1,例如:

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine(x));
Run Code Online (Sandbox Code Playgroud)

然后,您可以Select根据需要使用projection()来偏移/更改此值.

您还可以使用Zip运算符将一个流"调整"为另一个流 - 您可能也想查看它.Zip将来自两个流的事件组合在一起,因此它以当前最慢的流的速度发出.Zip也非常灵活,它可以压缩任意数量的流,甚至可以将IObservable压缩到IEnumerable.这是一个例子:

var pets = new List<string> { "Dog", "Cat", "Elephant" };
var pace = Observable.Interval(TimeSpan.FromSeconds(1))
    .Zip(pets, (n, p) => p)     
    .Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("Done"));
Run Code Online (Sandbox Code Playgroud)

这会以1秒的间隔写出宠物.

根据上面添加的PPS,我将给出另一个答案 - 我将留下这个作为参考,因为它无论如何都是一种有用的技术.