Mal*_*alt 9 c# java reactive-programming system.reactive rx-java
我有一个RX生成器,它创建一个像这样的字符串流(真实流的简化版本):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
流是无穷无尽的,但有序.因此,在以A用完为止的字符串之后,B启动.当B用完,C开始......当Z用完了,我们转移到AA1等有数目不详A的,B的等,但它通常每个字母10-30实例.
我正在寻找一种方法将这个流划分为所有A的块:A1 A2 A3,所有B:B1 B2所有C:C1 C2 C3 C4 C5 C6等等.每个块可以是一个可观察的(我将变成一个列表)或只是一个列表.
我使用RxJava尝试了几种不同的方法,所有方法都失败了.其中不起作用的是:
分组依据:由于流是无穷无尽的,因此每个字母的可观察量不会完成.所以当A的用完和B的开始时,A的Observable没有完成.因此,可观察量越来越多.
具有distinctUntilChanged的窗口/缓冲区 - 我在原始流上使用"distinctUntilChanged"来输出每个组的第一个项目(第一个A,第一个B等).然后我使用该流作为输入window或"缓冲"运算符用作窗口/缓冲区之间的边界.这不起作用,我得到的只是空列表.
什么是使用RX的正确解决方案?我更喜欢Java解决方案,但是其他RX实现中可以轻松转换为Java的解决方案也非常受欢迎.
你可以使用rxjava-extras .toListWhile:
Observable<String> source =
Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
(list, t) -> list.isEmpty()
|| list.get(0).charAt(0) == t.charAt(0)))
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
它完成了@akarnokd在封面下所做的事情,并进行了单元测试.
这是我解决这个问题的方法:
Observable<String> source = Observable.from(
"A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
Observable<List<String>> output = Observable.defer(() -> {
List<String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap(new Function<String, Observable<List<String>>>() {
String lastKey;
@Override
public Observable<List<String>> apply(String t) {
String key = t.substring(0, 1);
if (lastKey != null && !key.equals(lastKey)) {
List<String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v -> {
if (buffer.isEmpty()) {
return Observable.empty();
}
return Observable.just(buffer);
})
);
}
);
output.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
它是这样工作的: