使用 Observable.FromEventPattern 在不活动或计数后执行操作

Ste*_*hen 5 c# system.reactive

我有一个从事件模式创建的可观察流,如下所示。

var keyspaceStream = Observable.FromEventPattern<RedisSubscriptionReceivedEventArgs>(
            h => keyspaceMonitor.KeySpaceChanged += h,
            h => keyspaceMonitor.KeySpaceChanged -= h);
Run Code Online (Sandbox Code Playgroud)

我想要做的是订阅流并在有 10 秒不活动(没有发生任何事件)或 100 个事件在没有执行该方法的情况下触发时执行一个方法。这是为了避免每 5 秒触发一次事件并且从不调用 onNext 方法的情况。

我怎样才能做到这一点?我知道如何做第一部分(见下文),但我不知道如何做计数逻辑。请注意,我已经知道如何订阅流。

var throttledStream = keyspaceStream.Throttle(TimeSpan.FromSeconds(10));
Run Code Online (Sandbox Code Playgroud)

任何帮助将非常感激!谢谢你。

Tim*_*lds 4

Buffer与自定义一起使用bufferClosingSelector。这里的想法是,每个缓冲区都应该在项目之后maxDuration或之后关闭maxCount,以较早者为准。每次缓冲区关闭时,都会打开一个新缓冲区。

var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
    var reachedMaxDuration = o
        .Select(_ => Observable.Timer(maxDuration, scheduler))
        .Switch();
    return o.Buffer(() => o
        .TakeUntil(reachedMaxDuration)
        .Take(maxCount)
        .LastOrDefaultAsync());
});
Run Code Online (Sandbox Code Playgroud)

我假设您提供了IScheduler scheduler. 的类型throttledStream将是IObservable<IList<EventPattern<RedisSubscriptionReceivedEventArgs>>>.