RxJava缓冲区直到更改

Man*_*anu 2 rx-java rx-java2

我有一个可观察的东西,它发出大量的数据,例如

[1,1,1,2,2,2,3,3,1,1,5,5 ......]

在RxJava中,我们可以使用distinctUntilChanged()来获取一个独特的项目,直到它发生变化为止,并且它会

[1,2,3,1,5,......]

同样,有一种方法可以缓冲项目直到更改?例如,我希望输出像

[[1,1,1],[2,2,2],[3,3],[1,1],[5,5] ......]

aka*_*okd 6

您可以共享源序列,应用于distinctUntilChanged一条路径,然后驱动buffer使用Observable来指示边界的运算符:

@Test
@SuppressWarnings("unchecked")
public void test() {
    Observable.fromArray(1,1,1,2,2,2,3,3,1,1,5,5)
    .compose(bufferUntilChanged(v -> v))
    .test()
    .assertResult(
            Arrays.asList(1, 1, 1),
            Arrays.asList(2, 2, 2),
            Arrays.asList(3, 3),
            Arrays.asList(1, 1),
            Arrays.asList(5, 5)
        );
}

static final <T, K> ObservableTransformer<T, List<T>> bufferUntilChanged(
        Function<T, K> keySelector) {
    return o -> o.publish(q -> q.buffer(q.distinctUntilChanged(keySelector).skip(1)));
}
Run Code Online (Sandbox Code Playgroud)

skip(1)之所以存在,是因为通过的第一个项目distinctUntilChanged将触发一个新缓冲区,使第一个缓冲区为空。