RxJava:计数或时间范围内的窗口,不丢弃元素

jcf*_*ino 4 rx-java

我想以一定的最大尺寸分批我的流,如果在一段时间后没有达到这个尺寸,则关闭批次并开始一个新的.为此,我尝试使用窗口(计数):

things.window(10)
Run Code Online (Sandbox Code Playgroud)

然而,这将等待直到接收到10个元素以发出新的Observable窗口.如果我使用窗口(timespan,unit,count)运算符:

things.window(1, TimeUnit.SECONDS, 10)
Run Code Online (Sandbox Code Playgroud)

我将放弃在10日之后和时间跨度完成之前的所有元素.

我想要一个类似的运算符,而不是等到时间跨度完成时,在达到计数时发出一个新的窗口.

things.windowXXX(timespan = 1s, count = 2) : Observable[T]

things:   ----o--o--o-----------o----o------->
timespan: [      )[    1s    ][      )[    -->
window 1: -----o-o-|
window 2:          -o---------|
window 3:                     --o-----o-| 
Run Code Online (Sandbox Code Playgroud)

jcf*_*ino 6

我找到了一种方法来解决这个问题,使用缓冲区(timespan,timeunit,count)而不是window.我认为缓冲区和窗口只有一个发出列表和其他可观察对象的行为相同.但似乎这里存在差异.

我的解决方案是使用缓冲区,然后将每个结果映射到一个observable:

observable.buffer(10, TimeUnit.MILLISECONDS, 2)
          .map(new Func1<List<String>, Observable<String>>() {
                public Observable<String> call(List<String> l) {
                    return Observable.from(l);
                }
           })
Run Code Online (Sandbox Code Playgroud)

我做了一些测试来看看差异:https://gist.github.com/jcfandino/fd47277ada821f51a9d4

    observable.map(delayOn4).window(10, TimeUnit.MILLISECONDS, 2)
            .subscribe(new UpdateCountdowns("window"));
    assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
    assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));
Run Code Online (Sandbox Code Playgroud)

打印:

窗口 - 1968827463:1
窗口 - 1968827463:2
窗口 - 1968827463:3
窗口 - 1267747034:4
窗口 - 1267747034:5

请注意,它会发出两个批次而不是三个,第一个有三个元素而不是两个.

另一方面,通过使用缓冲区:

    observable.map(delayOn4).buffer(10, TimeUnit.MILLISECONDS, 2)
            .map(new Func1<List<String>, Observable<String>>() {
                public Observable<String> call(List<String> l) {
                    return Observable.from(l);
                }
            }).subscribe(new UpdateCountdowns("buffer"));
    assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
    assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));
Run Code Online (Sandbox Code Playgroud)

打印:

缓冲区 - 1257525795:1
缓冲区 - 1257525795:2
缓冲区 - 1849466438:3
缓冲区 - 104886386:4
缓冲区 - 104886386:5

三个批次最多有两个元素,因为"4"到达较晚,它在下一批中结束.

我不知道这是否是预期的行为,或者可能存在错误.