我有一个Observable序列,可以快速突发产生事件(即:一个接一个地发生五个事件,然后是长时间延迟,然后是另一个快速突发事件等).我希望通过在事件之间插入一个短暂的延迟来平滑这些突发.想象一下以下图表作为示例:
Raw: --oooo--------------ooooo-----oo----------------ooo| Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|
我目前的方法是在可以Observable.Interval()从原始流中拉出另一个事件时通过该信号生成类似节拍器的计时器.问题是我无法弄清楚如何将该计时器与我的原始无缓冲可观察序列相结合.
IObservable.Zip()接近于做我想要的,但它只有在原始流比定时器更快地产生事件时才有效.一旦原始流中存在显着的间歇,计时器就会建立一系列不需要的事件,然后立即与原始流中的下一个事件突发事件配对.
理想情况下,我想要一个具有以下函数签名的IObservable扩展方法,该方法生成我上面概述的bevaior.现在,来救我的StackOverflow :)
public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)
Run Code Online (Sandbox Code Playgroud)
PS.我是Rx的新手,所以如果这是一个简单的简单问题我很抱歉......
这是我最初的天真和简单的解决方案,它有很多问题:
public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
Queue<T> q = new Queue<T>();
source.Subscribe(x => q.Enqueue(x));
return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}
Run Code Online (Sandbox Code Playgroud)
第一个明显的问题是内部订阅返回到原始源的IDisposable丢失,因此订阅无法终止.在此方法返回的IDisposable上调用Dispose会终止计时器,但不会触发现在不必要地填充队列的基础原始事件源,没有人从队列中提取事件.
第二个问题是,从原始事件流到缓冲流,无法通过异常或流末尾通知进行传播 - 在订阅原始源时,它们将被忽略.
最后但并非最不重要的是,现在我已经有了定期唤醒的代码,无论是否有任何工作要做,我宁愿避免在这个美妙的新反应世界中.
为了解决我最初的简单的方法遇到的问题,我写了很多更复杂的功能,其行为很像IObservable.Delay()(我用.net反射来读取这些代码,并用它作为我的函数的基础上).不幸的是,很多样板逻辑AnonymousObservable都不能在system.reactive代码之外公开访问,因此我不得不复制并粘贴大量代码.这个解决方案似乎有效,但考虑到它的复杂性,我对它的bug没有信心.
我无法相信没有办法使用标准的Reactive扩展的某些组合来实现这一点.我讨厌感觉我不必要地重新发明轮子,我试图建立的模式似乎是一个相当标准的模式.
我想要实现的是缓冲来自某些IObservable的传入事件(它们以突发形式进行)并进一步释放它们,但是以偶数间隔逐个释放它们.像这样:
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
Run Code Online (Sandbox Code Playgroud)
因为我对Rx很新,所以我不确定是否已经有一个主题或运算符.也许它可以通过作曲来完成?
更新:
感谢 Richard Szalay指出Drain操作符,我找到了James Miles的Drain操作符使用的另一个例子.这是我如何设法让它在WPF应用程序中工作:
.Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
}).Subscribe();
Run Code Online (Sandbox Code Playgroud)
我有一些乐趣,因为省略scheduler参数会导致应用程序在调试模式下崩溃而不会出现任何异常(我需要学习如何处理Rx中的异常).Process方法直接修改了UI状态,但我想从它中生成一个IObservable非常简单(使用ISubject?).
更新:
与此同时,我一直在试验ISubject,下面的课程做了我想要的 - 它及时让出缓冲的Ts:
public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}
void Step()
{
T next;
lock (queue) …Run Code Online (Sandbox Code Playgroud) 我有一个API调用,它接受每秒最大调用率.如果超过速率,则抛出异常.
我想把这个调用包装成一个抽象,它可以使调用率保持在极限之下.它将像网络路由器一样:处理多个呼叫并将结果返回给正确的呼叫者,关注呼叫率.目标是使调用代码尽可能不知道该限制.否则,具有此调用的代码中的每个部分都必须包装到try-catch中!
例如:想象一下,您可以从可以添加2个数字的extern API调用方法.此API可以每秒调用5次.高于此值的任何内容都将导致异常.
为了说明问题,限制通话费率的外部服务就像这个问题的答案中的那个
附加信息:
由于每次从代码的任何部分调用此方法时都不希望担心该限制,因此您可以考虑设计一个可以调用的包装器方法,而不必担心速率限制.在内部你关心限制,但在外面你暴露了一个简单的异步方法.
它类似于Web服务器.它如何将正确的结果包返回给正确的客户?
多个呼叫者将调用此方法,他们将在结果出现时获得结果.这种抽象应该像代理一样.
我怎么能这样做?
我确信包装方法的公司应该是这样的
public async Task<Results> MyMethod()
Run Code Online (Sandbox Code Playgroud)
在方法内部,它将执行逻辑,可能使用Reactive Extensions(Buffer).我不知道.
但是怎么样?我的意思是,多次调用此方法应该将结果返回给正确的调用者.这甚至可能吗?
非常感谢!