我有一个Observable序列,可以快速突发产生事件(即:一个接一个地发生五个事件,然后是长时间延迟,然后是另一个快速突发事件等).我希望通过在事件之间插入一个短暂的延迟来平滑这些突发.想象一下以下图表作为示例:
Raw: --oooo--------------ooooo-----oo----------------ooo| Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|
我目前的方法是在可以Observable.Interval()从原始流中拉出另一个事件时通过该信号生成类似节拍器的计时器.问题是我无法弄清楚如何将该计时器与我的原始无缓冲可观察序列相结合.
IObservable.Zip()接近于做我想要的,但它只有在原始流比定时器更快地产生事件时才有效.一旦原始流中存在显着的间歇,计时器就会建立一系列不需要的事件,然后立即与原始流中的下一个事件突发事件配对.
理想情况下,我想要一个具有以下函数签名的IObservable扩展方法,该方法生成我上面概述的bevaior.现在,来救我的StackOverflow :)
public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)
Run Code Online (Sandbox Code Playgroud)
PS.我是Rx的新手,所以如果这是一个简单的简单问题我很抱歉......
这是我最初的天真和简单的解决方案,它有很多问题:
public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
Queue<T> q = new Queue<T>();
source.Subscribe(x => q.Enqueue(x));
return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}
Run Code Online (Sandbox Code Playgroud)
第一个明显的问题是内部订阅返回到原始源的IDisposable丢失,因此订阅无法终止.在此方法返回的IDisposable上调用Dispose会终止计时器,但不会触发现在不必要地填充队列的基础原始事件源,没有人从队列中提取事件.
第二个问题是,从原始事件流到缓冲流,无法通过异常或流末尾通知进行传播 - 在订阅原始源时,它们将被忽略.
最后但并非最不重要的是,现在我已经有了定期唤醒的代码,无论是否有任何工作要做,我宁愿避免在这个美妙的新反应世界中.
为了解决我最初的简单的方法遇到的问题,我写了很多更复杂的功能,其行为很像IObservable.Delay()(我用.net反射来读取这些代码,并用它作为我的函数的基础上).不幸的是,很多样板逻辑AnonymousObservable都不能在system.reactive代码之外公开访问,因此我不得不复制并粘贴大量代码.这个解决方案似乎有效,但考虑到它的复杂性,我对它的bug没有信心.
我无法相信没有办法使用标准的Reactive扩展的某些组合来实现这一点.我讨厌感觉我不必要地重新发明轮子,我试图建立的模式似乎是一个相当标准的模式.
我在System.Net.Http中使用HTTPClient来对API发出请求.API限制为每秒10个请求.
我的代码大致是这样的:
List<Task> tasks = new List<Task>();
items..Select(i => tasks.Add(ProcessItem(i));
try
{
await Task.WhenAll(taskList.ToArray());
}
catch (Exception ex)
{
}
Run Code Online (Sandbox Code Playgroud)
ProcessItem方法做了一些事情,但总是使用以下方法调用API :
await SendRequestAsync(..blah). 看起来像:
private async Task<Response> SendRequestAsync(HttpRequestMessage request, CancellationToken token)
{
token.ThrowIfCancellationRequested();
var response = await HttpClient
.SendAsync(request: request, cancellationToken: token).ConfigureAwait(continueOnCapturedContext: false);
token.ThrowIfCancellationRequested();
return await Response.BuildResponse(response);
}
Run Code Online (Sandbox Code Playgroud)
最初代码工作正常,但是当我开始使用Task.WhenAll时,我开始从API获得"超出速率限制"消息.如何限制请求的速率?
值得注意的是,ProcessItem可以根据项目进行1-4次API调用.
你如何限制每秒的操作次数?
假设我们必须将文件从一个位置复制到另一个位置,并且我们不希望每秒处理超过5个文件.
请看看我在做什么
private static string currentStamp;
private static int processedInCurrentStamp = 0;
private static void Main(string[] args)
{
currentStamp = DateTime.Now.ToString("{0:d/M/yyyy HH:mm:ss}");
Run();
}
private static void Run()
{
for (int i = 0; i < Int32.MaxValue; i++)
{
string s = DateTime.Now.ToString("{0:d/M/yyyy HH:mm:ss}");
if (currentStamp.Equals(s))
{
if (processedInCurrentStamp < 5)
{
ProcessItem();
processedInCurrentStamp++;
}
}
else
{
Console.WriteLine("{0} ::: {1}", currentStamp, processedInCurrentStamp);
currentStamp = s;
processedInCurrentStamp = 0;
}
}
}
Run Code Online (Sandbox Code Playgroud)
但我需要一种更优雅和防弹的方式.