gal*_*nus 3 .net c# events reactive-programming system.reactive
我的问题是:对于给定的事件序列,我想缓存它们的值,直到流中出现暂停。然后,我将批量处理所有缓存数据并清除缓存状态。
一种天真的方法是(不是工作代码,可能存在一些错误):
struct FlaggedData
{
public EventData Data { get; set; }
public bool Reset { get; set; }
}
...
IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();
var flaggedDataStream = eventsStream
.Select(data => new FlaggedData { Data = data })
.Merge(resetSignal)
.Scan(
new List<EventData>(),
(cache, flaggedData) =>
{
if (!flaggedData.Reset())
{
cache.Add(flaggedData.Data);
return cache;
}
return new List<EventData>();
})
.Throttle(SomePeriodOfTime)
.Subscribe(batch =>
{
resetSignal.OnNext(new FlaggedData { Reset = true});
ProcessBatch(batch);
});
Run Code Online (Sandbox Code Playgroud)
所以在这里,在收到任何要处理的批处理后,我请求重置缓存。问题是因为Throttle缓存中可能有一些数据(或者我相信),在这种情况下会丢失。
我想要的是一些操作,如:
ScanWithThrottling<TAccumulate, TSource>(
IObservable<TSource> source,
Func<TAccumulate, TSource, TAccumulate> aggregate,
TimeSpan throttlingSpan)
Run Code Online (Sandbox Code Playgroud)
它返回一个 observable,它会在每次调用OnNext其订阅者时重置累积值。
当然,我可以编写自己的扩展,但问题是是否有某种方法可以使用标准 Rx 操作实现相同的效果。
我认为这里有一个简单的方法。用于Buffer()基于这样的节流阀缓冲元素:
var buffered = source.Publish(ps =>
ps.Buffer(() => ps.Throttle(SomePeriodOfTime)));
Run Code Online (Sandbox Code Playgroud)
这将缓冲元素,直到出现 SomePeriodOfTime 间隙并将它们显示为列表。无需担心“重置”方面,您不会丢失元素。
的使用Publish确保对源事件有一个单一的共享订阅,可供Buffer和 每个Throttle. 节流阀是缓冲区关闭功能,提供指示应启动新缓冲区的信号。
这是一个可测试的版本 - 我只是在这里转储每个缓冲区的长度并Timestamp用于添加时间信息,但它IList<T>是您在原始缓冲流上获得的。请注意调度程序如何作为基于时间的操作的参数提供以启用测试。
请注意,您将需要 nuget 包 rx-testing 来运行此示例,以引入 Rx 测试框架并获取TestScheduler并ReactiveTest键入:
void Main()
{
var scenarios = new Scenarios();
scenarios.Scenario1();
}
public class Scenarios : ReactiveTest
{
public void Scenario1()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1400, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9));
var duration = TimeSpan.FromTicks(300);
var buffered = source.Publish(ps =>
ps.Buffer(() => ps.Throttle(duration, scheduler)));
buffered.Timestamp(scheduler).Subscribe(
x => Console.WriteLine("Timestamp: {0} Value: {1}",
x.Timestamp.Ticks, x.Value.Count()));
scheduler.Start();
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1610 次 |
| 最近记录: |