Rx for .Net:如何将 Scan 与 Throttle 结合使用

gal*_*nus 3 .net c# events reactive-programming system.reactive

我的问题是:对于给定的事件序列,我想缓存它们的值,直到流中出现暂停。然后,我将批量处理所有缓存数据并清除缓存状态。

一种天真的方法是(不是工作代码,可能存在一些错误):

struct FlaggedData
{
    public EventData Data { get; set; }
    public bool Reset { get; set; }
}

...

IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();

var flaggedDataStream = eventsStream
    .Select(data => new FlaggedData { Data = data })
    .Merge(resetSignal)
    .Scan(
        new List<EventData>(),
        (cache, flaggedData) =>
        {
            if (!flaggedData.Reset())
            {
                cache.Add(flaggedData.Data);
                return cache;
            }

            return new List<EventData>();
        })
    .Throttle(SomePeriodOfTime)
    .Subscribe(batch => 
        {
            resetSignal.OnNext(new FlaggedData { Reset = true});
            ProcessBatch(batch);
        });
Run Code Online (Sandbox Code Playgroud)

所以在这里,在收到任何要处理的批处理后,我请求重置缓存。问题是因为Throttle缓存中可能有一些数据(或者我相信),在这种情况下会丢失。

我想要的是一些操作,如:

ScanWithThrottling<TAccumulate, TSource>(
    IObservable<TSource> source,
    Func<TAccumulate, TSource, TAccumulate> aggregate,
    TimeSpan throttlingSpan)
Run Code Online (Sandbox Code Playgroud)

它返回一个 observable,它会在每次调用OnNext其订阅者时重置累积值。

当然,我可以编写自己的扩展,但问题是是否有某种方法可以使用标准 Rx 操作实现相同的效果。

Jam*_*rld 5

我认为这里有一个简单的方法。用于Buffer()基于这样的节流阀缓冲元素:

var buffered = source.Publish(ps =>        
    ps.Buffer(() => ps.Throttle(SomePeriodOfTime)));
Run Code Online (Sandbox Code Playgroud)

这将缓冲元素,直到出现 SomePeriodOfTime 间隙并将它们显示为列表。无需担心“重置”方面,您不会丢失元素。

的使用Publish确保对源事件有一个单一的共享订阅,可供Buffer和 每个Throttle. 节流阀是缓冲区关闭功能,提供指示应启动新缓冲区的信号。

这是一个可测试的版本 - 我只是在这里转储每个缓冲区的长度并Timestamp用于添加时间信息,但它IList<T>是您在原始缓冲流上获得的。请注意调度程序如何作为基于时间的操作的参数提供以启用测试。

请注意,您将需要 nuget 包 rx-testing 来运行此示例,以引入 Rx 测试框架并获取TestSchedulerReactiveTest键入:

void Main()
{
    var scenarios = new Scenarios();
    scenarios.Scenario1();
}

public class Scenarios : ReactiveTest
{
    public void Scenario1()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(800, 4),
            OnNext(900, 5),
            OnNext(1400, 6),
            OnNext(1600, 7),
            OnNext(1700, 8),
            OnNext(1800, 9));

    var duration = TimeSpan.FromTicks(300);

    var buffered = source.Publish(ps =>        
        ps.Buffer(() => ps.Throttle(duration, scheduler)));

        buffered.Timestamp(scheduler).Subscribe(
            x => Console.WriteLine("Timestamp: {0} Value: {1}",
                 x.Timestamp.Ticks, x.Value.Count()));

        scheduler.Start();

    }
}
Run Code Online (Sandbox Code Playgroud)