在Rx中,如何在一段时间后对最新项目进行分组?

Tho*_*que 7 c# system.reactive

对不起,如果标题不是很清楚,我想不出更好的......

我正在以a的形式接收用户输入IObservable<char>,并且我想IObservable<char[]>通过在每次用户停止输入超过1秒时对字符进行分组来将其转换为a .因此,例如,如果输入如下:

h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)
Run Code Online (Sandbox Code Playgroud)

我希望输出可观察到:

['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']
Run Code Online (Sandbox Code Playgroud)

我怀疑该解决方案是相当简单的,但我不能找到正确的方法......我试图用Buffer,GroupByUntil,Throttle和其他几个人,但没有成功.

任何想法都会受到欢迎!


编辑:我有一些几乎可行的东西:

        _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);
Run Code Online (Sandbox Code Playgroud)

但是每次键入新字符时我都需要重置延迟...

Gid*_*rth 7

BufferThrottle如果你的来源很热,那就足够了.为了使它变热,您可以使用.Publish().RefCount()以确保您最终只有一个订阅源.

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
                                              TimeSpan dueTime)
{
    if (source == null) throw new ArgumentNullException("source");
    //defer dueTime checking to Throttle
    var hot = source.Publish().RefCount();
    return hot.Buffer(() => hot.Throttle(dueTime));
}
Run Code Online (Sandbox Code Playgroud)