小编use*_*094的帖子

如何等待异步Observable完成

我正在尝试使用rxjava构建一个示例.该示例应协调ReactiveWareService和ReactiveReviewService重新运行WareAndReview组合.

ReactiveWareService
        public Observable<Ware> findWares() {
        return Observable.from(wareService.findWares());
    }

ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency!

public Observable<Review> findReviewsByItem(final String item) {
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> {
    try {
        List<Review> reviews = reviewService.findReviewsByItem(item);
        reviews.forEach(observer::onNext);
        observer.onCompleted();
    } catch (Exception e) {
        observer.onError(e);
    }
}));
}

public List<WareAndReview> findWaresWithReviews() throws RuntimeException {
final List<WareAndReview> wareAndReviews = new ArrayList<>();

wareService.findWares()
    .map(WareAndReview::new)
.subscribe(wr -> {
        wareAndReviews.add(wr);
        //Async!!!!
        reviewService.findReviewsByItem(wr.getWare().getItem())
            .subscribe(wr::addReview,
                throwable -> System.out.println("Error while trying to find reviews for " + …
Run Code Online (Sandbox Code Playgroud)

java rx-java

12
推荐指数
2
解决办法
4万
查看次数

标签 统计

java ×1

rx-java ×1