自定义筛选器运算符RxJava

Ume*_*ooq 1 java android rx-android reactive rx-java2

我正试图拥抱RxJava的荣耀并将其集成到我的应用程序中.我已经编写了以下代码来添加漫画,其累积成本不超过定义的预算.为实现这一目标,我编写了2个实现.

  1. Observable.create()不鼓励的用途主要是因为订阅和背压的复杂性
  2. 使用RxAndroid lib中已有的运算符.

如果暂时将Subscription和Backpressure处理放在一边,我希望得到关于哪些实现在性能,内存消耗和简单性方面更好的反馈Observable.create()

第一次实施:

Observable<Integer> filterObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Timber.d("filterComicsAccordingToBudget():subscribe");
        int pageCountOfComicsWithInBudget = 0;
        double totalCost = 0.0;
        for(MarvelComic comic : getMarvelComicsList()) {
            totalCost += Double.valueOf(comic.getPrice());
            Timber.d("totalCost: %s budget: %s priceOfComic: %s", totalCost, budget, comic.getPrice());
            if(totalCost > budget) {
                break;
            }
            pageCountOfComicsWithInBudget += Integer.valueOf(comic.getPageCount());
            Timber.d("pageCount: %s price: %s comicName: %s totalPages: %s", comic.getPageCount(), comic.getPrice(), comic.getTitle(), pageCountOfComicsWithInBudget);
            e.onNext(pageCountOfComicsWithInBudget);
        }
        e.onComplete();
    }
});

filterObservable.subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<Integer>() {
        int comicCount = 0;
        int pageCountOfComicsWithInBudget = 0;

        @Override
        public void onSubscribe(Disposable d) {
            Timber.d("filterComicsAccordingToBudget():onSubscribe");
        }

        @Override
        public void onNext(Integer pageCountOfComicsWithInBudget) {
            Timber.d("filterComicsAccordingToBudget():onNext");
            comicCount++;
        }

        @Override
        public void onError(Throwable e) {
            Timber.e("onFilterComicsForBudget:onError() %s", e);
        }

        @Override
        public void onComplete() {
            Timber.d("filterComicsAccordingToBudget():onComplete");
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

第二次实施:

Observable.fromIterable(getMarvelComicsList())
    .map(new Function<MarvelComic, HashMap<String, Double>>() {
        HashMap<String, Double> myMap = new HashMap<String, Double>();
        double count = 0;

        @Override
        public HashMap<String, Double> apply(@NonNull MarvelComic marvelComic) throws Exception {
            myMap.put("price", Double.valueOf(marvelComic.getPrice()));
            myMap.put("pageCount", Double.valueOf(marvelComic.getPageCount()));
            myMap.put("comicsCount", count++);
            return myMap;
        }
    })
    .takeWhile(new Predicate<HashMap<String, Double>>() {

        double sum;

        @Override
        public boolean test(@NonNull HashMap<String, Double> map) throws Exception {
            Timber.e("sum is: %s", sum);
            return (sum += map.get("price")) < 5.00;
        }
    })
    .subscribe(new Observer<HashMap<String, Double>>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(HashMap<String, Double> map) {
            Timber.e("value in onNext is: %s %s %s", map.get("pageCount"), map.get("price"), map.get("comicsCount"));
        }

        @Override
        public void onError(Throwable e) {
            Timber.e("onError()!!!   %s",e);
        }

        @Override
        public void onComplete() {
            Timber.e("onComplete()!!!");
        }
    });
Run Code Online (Sandbox Code Playgroud)

我有点喜欢第一个实现,因为它比我习惯的更加迫切,对我来说似乎不那么笨拙但是考虑到我对RxJava的有限知识,我可能完全错了.

GVi*_*i82 5

我会避免Observable为这种操作创建自定义.您可以使用普通的RxJava运算符完成所有操作.

在飞行中我会做这样的事情:

private Observable<Double> getLimitObservable(final double budget) {
    return Observable.fromIterable(getMarvelComicsList())
          .scan(0D, (aDouble, marvelComic) -> aDouble + marvelComic.getPrice()) 
          .takeWhile(aDouble -> aDouble < budget)
          .skip(1);
}
Run Code Online (Sandbox Code Playgroud)

上面的代码使用scan(也称为累加器)运算符来跟踪漫画的总价格.更多细节在这里.所以,现在从新的Observable返回一个double(表示总量).在此之后,我们takeWhile停止物品的排放,直到条件保持为真.最后我跳过了第一个项目,因为上面提到的Observable将发出至少一个项目(在条件可以验证之前).

Observable.zip(getLimitObservable(500d), Observable.fromIterable(getMarvelComicsList()), (aDouble, marvelComic) -> marvelComic)
                .subscribe(marvelComic -> Log.d("test", "comic: " + marvelComic.getName()));
Run Code Online (Sandbox Code Playgroud)

现在我将之前的observable与一个新的observable(使用zip运算符)组合在一起,为每一个项目生成一个新项目(一个来自第一个observable,一个来自第二个),这样你就会得到一个项目等于从两个可观察者发出的最小项目数.更多细节在这里

这将打印列表中的第一个漫画列表,直到达到预算限制.

我打赌有更好的解决方案,但这只是一个例子.