我想要实现的是缓冲来自某些IObservable的传入事件(它们以突发形式进行)并进一步释放它们,但是以偶数间隔逐个释放它们.像这样:
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
Run Code Online (Sandbox Code Playgroud)
因为我对Rx很新,所以我不确定是否已经有一个主题或运算符.也许它可以通过作曲来完成?
更新:
感谢 Richard Szalay指出Drain操作符,我找到了James Miles的Drain操作符使用的另一个例子.这是我如何设法让它在WPF应用程序中工作:
.Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
}).Subscribe();
Run Code Online (Sandbox Code Playgroud)
我有一些乐趣,因为省略scheduler参数会导致应用程序在调试模式下崩溃而不会出现任何异常(我需要学习如何处理Rx中的异常).Process方法直接修改了UI状态,但我想从它中生成一个IObservable非常简单(使用ISubject?).
更新:
与此同时,我一直在试验ISubject,下面的课程做了我想要的 - 它及时让出缓冲的Ts:
public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}
void Step()
{
T next;
lock (queue) …Run Code Online (Sandbox Code Playgroud)