以下代码:
Observable
.just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.doOnNext(item -> System.out.println("source emitting " + item))
.groupBy(item -> {
System.out.println("groupBy called for " + item);
return item % 3;
})
.subscribe(observable -> {
System.out.println("got observable " + observable + " for key " + observable.getKey());
observable.subscribe(item -> {
System.out.println("key " + observable.getKey() + ", item " + item);
});
});
Run Code Online (Sandbox Code Playgroud)
让我感到困惑.我得到的输出是:
source emitting 0
groupBy called for 0
got observable rx.observables.GroupedObservable@42110406 for key 0
key 0, item 0
source emitting 1
groupBy called for 1
got observable rx.observables.GroupedObservable@1698c449 for key 1
key 1, item 1
source emitting 2
groupBy called for 2
got observable rx.observables.GroupedObservable@5ef04b5 for key 2
key 2, item 2
source emitting 3
groupBy called for 3
key 0, item 3
source emitting 4
groupBy called for 4
key 1, item 4
source emitting 5
groupBy called for 5
key 2, item 5
source emitting 6
groupBy called for 6
key 0, item 6
source emitting 7
groupBy called for 7
key 1, item 7
source emitting 8
groupBy called for 8
key 2, item 8
source emitting 9
groupBy called for 9
key 0, item 9
Run Code Online (Sandbox Code Playgroud)
因此,在顶级订阅方法中,我按预期从GroupedObservable获得3个可观察对象.然后,我一个接一个地订阅分组的observable - 这里我不明白的事情:
为什么原始项目仍以原始序列(即0,1,2,3 ......)发出,而不是0,3,6,9 ......用于键0,后面是1,4,7为键1,键2的后面是2,5,8;
我想我了解如何创建组:
1. 0 is emitted, the key function is called and it gets 0
2. it is checked if an observable for 0 exists, it doesn't, so a new one is created and emitted, and then it emits 0
3. the same happens for source items 1 and 2 as they both create new groups, and observables with key 1 and 2 are emitted, and they emit 1 and 2 correspondingly
4. source item 3 is emitted, the key function is called and it gets 0
5. it is checked if an observable for 0 exists, it does -> no new grouped observable is created nor emitted, but 3 is emitted by the already existing observable
6. etc. until the source sequence is drained
Run Code Online (Sandbox Code Playgroud)
似乎虽然我逐个获得了分组的可观测量,但它们的排放在某种程度上是交错的.这是怎么发生的?
为什么原始项仍然按原始序列发出(即 0、1、2、3、...),而不是对于键 0 发出 0、3、6、9...,然后对于键发出 1、4、7 1,然后是键 2 的 2、5、8?
你已经回答了你自己的问题。您正在按项目发出的顺序对项目流进行操作。因此,当每个发出时,它都会沿着运算符链传递,您会看到此处显示的输出。
您期望的替代输出要求链等待,直到源停止为所有组发出项目。说你有Observable.just(0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 0)。那么您期望 (0, 3, 0), (1, 4, 4, 4, 4, 4, 4), (2) 作为输出组。如果你有无限的 4 流怎么办?您的订阅者永远不会从第一组收到 0, 3..。
您可以创建您正在寻找的行为。操作toList符将缓存输出直到源完成,然后将 a 传递List<R>给订阅者:
.subscribe(observable -> {
System.out.println("got observable " + observable + " for key " + observable.getKey());
observable.toList().subscribe(items -> {
// items is a List<Integer>
System.out.println("key " + observable.getKey() + ", items " + items);
});
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4571 次 |
| 最近记录: |