如何在完成之前使用一个rxjava observable?

the*_*del 9 android rx-java

我有两个RxJava Observables,我从第一个observable获得一个arraylist,然后使用它从另一个observable中获取数据.

Observable<KarobarTvVod> observable1 = youtubeDataHelper.getTVData();
    observable1.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .unsubscribeOn(Schedulers.io())
            .subscribe(new Subscriber<KarobarTvVod>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onNext(KarobarTvVod karobarTvVod) {
                    Log.d(TAG, "onNext: size" + karobarTvVod.getEtag());
                    tvObjArrayList = new ArrayList<TVObj>();
                    for (int i = 0; i < karobarTvVod.getItems().size(); i++) {
                        TVObj tvObj = new TVObj();
                        tvObj.setVideoDate(karobarTvVod.getItems().get(i).getSnippet().getPublishedAt());
                        tvObj.setVideoIcon(karobarTvVod.getItems().get(i).getSnippet().getThumbnails().getHigh().getUrl());
                        tvObj.setVideoTitle(karobarTvVod.getItems().get(i).getSnippet().getTitle());
                        tvObj.setVideoID(karobarTvVod.getItems().get(i).getId().getVideoId());
                        tvObjArrayList.add(tvObj);
                    }


                }
            });



    Observable<YoutubeViews> observable2 = youtubeDataHelper.getTVDataViews(tvObjArrayList);
    observable2.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .unsubscribeOn(Schedulers.io())
            .subscribe(new Subscriber<YoutubeViews>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: in 2nd obs");
                    e.printStackTrace();
                }

                @Override
                public void onNext(YoutubeViews youtubeViews) {
                    Log.d(TAG, "onNext: views" + youtubeViews.getEtag());
                    viewsList = new ArrayList<String>();
                    for (int i = 0; i < youtubeViews.getItems().size(); i++) {

                        viewsList.add(youtubeViews.getItems().get(i).getStatistics().getViewCount());
                    }
                    tvView.displayList(tvObjArrayList, viewsList);
                }
            });
Run Code Online (Sandbox Code Playgroud)

这只是示例代码,我需要通过tvObjArrayList时,它会从1日可观测填入到第二个观察的,什么是这样做的最佳做法是什么?而且我在第一个Observable中使用for循环,是否有更好的方法使用rxjava实现它?谢谢

Bar*_*ski 5

你应该使用flatMap运营商.它不会比那更容易.

Observable<KarobarTvVod> observable1 = youtubeDataHelper.getTVData();
observable1.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .unsubscribeOn(Schedulers.io())
        .flatMap(new Func1<KarobarTvVod, Observable<YoutubeViews>>() {
            @Override
            public Observable<YoutubeViews> call(KarobarTvVod karobarTvVod) {
                Log.d(TAG, "onNext: size" + karobarTvVod.getEtag());
                tvObjArrayList = new ArrayList<TVObj>();
                for (int i = 0; i < karobarTvVod.getItems().size(); i++) {
                    TVObj tvObj = new TVObj();
                    tvObj.setVideoDate(karobarTvVod.getItems().get(i).getSnippet().getPublishedAt());
                    tvObj.setVideoIcon(karobarTvVod.getItems().get(i).getSnippet().getThumbnails().getHigh().getUrl());
                    tvObj.setVideoTitle(karobarTvVod.getItems().get(i).getSnippet().getTitle());
                    tvObj.setVideoID(karobarTvVod.getItems().get(i).getId().getVideoId());
                    tvObjArrayList.add(tvObj);
                }
                return youtubeDataHelper.getTVDataViews(tvObjArrayList);
            }
        }).subscribe(new Subscriber<YoutubeViews>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: in 1st or 2nd obs");
                e.printStackTrace();
            }

            @Override
            public void onNext(YoutubeViews youtubeViews) {
                Log.d(TAG, "onNext: views" + youtubeViews.getEtag());
                viewsList = new ArrayList<String>();
                for (int i = 0; i < youtubeViews.getItems().size(); i++) {

                    viewsList.add(youtubeViews.getItems().get(i).getStatistics().getViewCount());
                }
                tvView.displayList(tvObjArrayList, viewsList);
            }
        });
Run Code Online (Sandbox Code Playgroud)