如何从flatMapIterable合并项目

mew*_*ewa 5 java rx-java

我有一个无限的可观察对象(db观察者),它发出有限的项目序列,需要处理这些项目,然后再作为一个组发出。

这里的问题是,如何编写它以便toList不等待原始源完成,而要等待flatMapIterable生成的序列。

DbObservable.map(new Func1<Query, List<Item>>() {
                        @Override
                        public List<Item> call(Query query) {
                            return query.items; //get items from db query
                        }
                    })
                    .flatMapIterable(new Func1<List<Item>, Iterable<Item>>() {
                        @Override
                        public Iterable<Item> call(List<GeoStop> geoStops) {
                            return geoStops;
                        }
                    })
                    .flatMap(/*process*/)
                    .toList() // regenerate List that was passed in to flatMapIterable
                    //subscribe and emit list of items
Run Code Online (Sandbox Code Playgroud)

没有下游用户因toList被卡住等待DbObservableonComplete

And*_*ion 5

toList()等待onCompleted()事件但flatMap(/*process*/)不传播完成。

所以,你需要在一个新的内部调用它们 flatMap()

db.map(q -> q.items)
    .flatMap(items -> Observable.from(items)
        .flatMapIterable(items)
        .flatMap(/*process*/)
        .toList()
    )
    .subscribe(...)
Run Code Online (Sandbox Code Playgroud)


aka*_*okd 4

您可以使用 flatMap+toList 处理第一个 flatMap 内的列表:

db.map(q -> q.items)
  .flatMap(items -> Observable.from(items).flatMap(...).toList())
  .subscribe(...)
Run Code Online (Sandbox Code Playgroud)