Chr*_*Fin 7 c# buffer batching system.reactive
在我使用.Net 4.6的WPF应用程序中,我有一个以高速率(每秒几百个)激发新数据点的事件,但不是所有时间.此数据显示在图表中.
我想每50毫秒更新一次图表,而不是每个新数据点之后.
为了达到这个目的,我使用Buffer(TimeSpan.FromMilliseconds(50))了Rx,这在理论上运行良好.但是如果没有创建新的数据点,我的订户也会每50毫秒被调用一次,这不是我想要的.
我创建了一个小样本应用程序来测试它:
using System;
using System.Reactive.Linq;
namespace RxTester
{
public class Program
{
private static event EventHandler TheEvent;
static void Main(string[] args)
{
var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
.Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));
var random = new Random();
var timer = new System.Timers.Timer(2000)
{
AutoReset = true,
Enabled = true
};
timer.Elapsed += (s, e) =>
{
var amount = random.Next(1, 10);
for (int i = 0; i < amount; ++i)
TheEvent?.Invoke(null, null);
};
Console.ReadLine();
timer.Enabled = false;
subscriber.Dispose();
}
}
}
Run Code Online (Sandbox Code Playgroud)
您需要添加"Rx-Linq"NuGet包才能运行或使用以下小提琴:https://dotnetfiddle.net/TV5tD4
在那里你会看到几个"收到的0个元素",这是我想要避免的.我知道我可以简单地检查一下e.Count == 0,但是当我使用多个这样的缓冲区时,这对我来说似乎不是最佳选择.
如果元素可用,有没有办法只创建新的缓冲元素块?
我也开放其他方法来解决我按时间批量处理事件的问题 - 我已经查看了TPL数据流BatchBlock,但这似乎只支持基于计数的块大小.
我们可以再次使用强大的GroupByUntil方法来创建这个扩展
public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
(this IObservable<TSource> source,
TimeSpan threshold)
{
return source.Publish( sp =>
sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
.SelectMany(i => i.ToList()));
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
944 次 |
| 最近记录: |