Rx - 按条件将流划分为段(列表)

Mal*_*alt 9 c# java reactive-programming system.reactive rx-java

我有一个RX生成器,它创建一个像这样的字符串流(真实流的简化版本):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

流是无穷无尽的,但有序.因此,在以A用完为止的字符串之后,B启动.当B用完,C开始......当Z用完了,我们转移到AA1等有数目不详A的,B的等,但它通常每个字母10-30实例.

我正在寻找一种方法将这个流划分为所有A的块:A1 A2 A3,所有B:B1 B2所有C:C1 C2 C3 C4 C5 C6等等.每个块可以是一个可观察的(我将变成一个列表)或只是一个列表.

我使用RxJava尝试了几种不同的方法,所有方法都失败了.其中不起作用的是:

  • 分组依据:由于流是无穷无尽的,因此每个字母的可观察量不会完成.所以当A的用完和B的开始时,A的Observable没有完成.因此,可观察量越来越多.

  • 具有distinctUntilChanged的窗口/缓冲区 - 我在原始流上使用"distinctUntilChanged"来输出每个组的第一个项目(第一个A,第一个B等).然后我使用该流作为输入window或"缓冲"运算符用作窗口/缓冲区之间的边界.这不起作用,我得到的只是空列表.

什么是使用RX的正确解决方案?我更喜欢Java解决方案,但是其他RX实现中可以轻松转换为Java的解决方案也非常受欢迎.

Dav*_*ten 5

你可以使用rxjava-extras .toListWhile:

Observable<String> source = 
    Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
            (list, t) -> list.isEmpty() 
                         || list.get(0).charAt(0) == t.charAt(0)))
      .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

它完成了@akarnokd在封面下所做的事情,并进行了单元测试.


aka*_*okd 4

这是我解决这个问题的方法:

Observable<String> source = Observable.from(
        "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");

Observable<List<String>> output = Observable.defer(() -> {
    List<String> buffer = new ArrayList<>();
    return 
            Observable.concat(
                source.concatMap(new Function<String, Observable<List<String>>>() {
                    String lastKey;
                    @Override
                    public Observable<List<String>> apply(String t) {
                        String key = t.substring(0, 1);
                        if (lastKey != null && !key.equals(lastKey)) {
                            List<String> b = new ArrayList<>(buffer);
                            buffer.clear();
                            buffer.add(t);
                            lastKey = key;
                            return Observable.just(b);
                        }
                        lastKey = key;
                        buffer.add(t);
                        return Observable.empty();
                    }
                }),
                Observable.just(1)
                .flatMap(v -> {
                    if (buffer.isEmpty()) {
                        return Observable.empty();
                    }
                    return Observable.just(buffer);
                })
            );
    }
);

output.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

它是这样工作的:

  • 我使用 defer 是因为我们需要每个订阅者的缓冲区,而不是全局缓冲区
  • 如果源恰好是有限的,我会将缓冲与最后一个缓冲区的发射连接起来
  • 我使用 concatMap 并添加到缓冲区,直到键发生变化,直到那时,我发出空的 Observables。一旦密钥发生变化,我就会发出缓冲区的内容并开始一个新的。