我有一个无限的可观察对象(db观察者),它发出有限的项目序列,需要处理这些项目,然后再作为一个组发出。
这里的问题是,如何编写它以便toList不等待原始源完成,而要等待flatMapIterable生成的序列。
DbObservable.map(new Func1<Query, List<Item>>() {
@Override
public List<Item> call(Query query) {
return query.items; //get items from db query
}
})
.flatMapIterable(new Func1<List<Item>, Iterable<Item>>() {
@Override
public Iterable<Item> call(List<GeoStop> geoStops) {
return geoStops;
}
})
.flatMap(/*process*/)
.toList() // regenerate List that was passed in to flatMapIterable
//subscribe and emit list of items
Run Code Online (Sandbox Code Playgroud)
没有下游用户因toList被卡住等待DbObservable的onComplete。
toList()等待onCompleted()事件但flatMap(/*process*/)不传播完成。
所以,你需要在一个新的内部调用它们 flatMap()
db.map(q -> q.items)
.flatMap(items -> Observable.from(items)
.flatMapIterable(items)
.flatMap(/*process*/)
.toList()
)
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)
您可以使用 flatMap+toList 处理第一个 flatMap 内的列表:
db.map(q -> q.items)
.flatMap(items -> Observable.from(items).flatMap(...).toList())
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1321 次 |
| 最近记录: |