无需递归即可对 Observable 结果进行分页 - RxJava

Ben*_*ith 4 java pagination reactive-programming observable rx-java

我有一个非常标准的 API 分页问题,​​您可以通过一些简单的递归来处理。这是一个虚构的例子:

public Observable<List<Result>> scan() {
    return scanPage(Optional.empty(), ImmutableList.of());
}

private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
    return this.scanner.scan(startKey, LIMIT)
            .flatMap(page -> {
                if (!page.getLastKey().isPresent()) {
                    return Observable.just(results);
                }
                return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
                        .addAll(results)
                        .addAll(page.getResults())
                        .build()
                );
            });
}
Run Code Online (Sandbox Code Playgroud)

但这显然会创建一个庞大的调用堆栈。如何强制执行此操作但维护 Observable 流?

这是一个命令式阻塞示例:

public List<Result> scan() {
    Optional<String> startKey = Optional.empty();
    final ImmutableList.Builder<Result> results = ImmutableList.builder();

    do {
        final Page page = this.scanner.scan(startKey);
        startKey = page.getLastKey();
        results.addAll(page.getResults());
    } while (startKey.isPresent());

    return results.build();
}
Run Code Online (Sandbox Code Playgroud)

Ada*_*hip 5

JohnWowUs 的回答很好,帮助我了解如何有效地避免递归,但有一些点我仍然感到困惑,所以我发布了我的调整版本。

概括:

  • 单个页面作为Single.
  • 使用 aFlowable来流式传输页面中包含的每个项目。这意味着我们函数的调用者不需要知道各个页面,只需收集包含的项目。
  • 使用 aBehaviorProcessor从第一页开始,如果下一页可用,则在我们检查当前页面后获取每个后续​​页面。
  • 关键是调用processor.onNext(int)开始下一次迭代。

此代码取决于rxjavareact -streams

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.processors.BehaviorProcessor;

public class Pagination {

    // Fetch all pages and return the items contained in those pages, using the provided page fetcher function
    public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) {
        // Processor issues page indices
        BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0);
        // When an index number is issued, fetch the corresponding page
        return processor.concatMap(index -> fetchPage.apply(index).toFlowable())
                        // when returning the page, update the processor to get the next page (or stop)
                        .doOnNext(page -> {
                            if (page.hasNext()) {
                                processor.onNext(page.getNextPageIndex());
                            } else {
                                processor.onComplete();
                            }
                        })
                        .concatMapIterable(Page::getElements);
    }

    public static void main(String[] args) {
        fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println);
    }

    // A function to fetch a page of our paged data
    private static Single<Page<String>> examplePageFetcher(int index) {
        return Single.just(pages.get(index));
    }

    // Create some paged data
    private static ArrayList<Page<String>> pages = new ArrayList<>(3);

    static {
        pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1)));
        pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2)));
        pages.add(new Page<>(Arrays.asList("five"), Optional.empty()));
    }

    static class Page<T> {
        private List<T> elements;
        private Optional<Integer> nextPageIndex;

        public Page(List<T> elements, Optional<Integer> nextPageIndex) {
            this.elements = elements;
            this.nextPageIndex = nextPageIndex;
        }

        public List<T> getElements() {
            return elements;
        }

        public int getNextPageIndex() {
            return nextPageIndex.get();
        }

        public boolean hasNext() {
            return nextPageIndex.isPresent();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

one
two
three
four
five
Run Code Online (Sandbox Code Playgroud)


Joh*_*wUs 4

这不是最优雅的解决方案,但您可以使用主题和副作用。请参阅下面的玩具示例

import rx.Observable;
import rx.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import rx.subjects.*;

public class Pagination {
    static HashMap<String,ArrayList<String>> pages = new HashMap<String,ArrayList<String>>();

    public static void main(String[] args) throws InterruptedException {
        pages.put("default", new ArrayList<String>());
        pages.put("2", new ArrayList<String>());
        pages.put("3", new ArrayList<String>());
        pages.put("4", new ArrayList<String>());

        pages.get("default").add("2");
        pages.get("default").add("Maths");
        pages.get("default").add("Chemistry");  

        pages.get("2").add("3");
        pages.get("2").add("Physics");   
        pages.get("2").add("Biology"); 

        pages.get("3").add("4");
        pages.get("3").add("Art");   

        pages.get("4").add("");
        pages.get("4").add("Geography"); 



        Observable<List<String>> ret = Observable.defer(() -> 
        { 
            System.out.println("Building Observable");
            ReplaySubject<String> pagecontrol = ReplaySubject.<String>create(1);
            Observable<List<String>> ret2 = pagecontrol.asObservable().concatMap(aKey -> 
            {
                if (!aKey.equals("")) {
                    return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0)));
                } else {
                    return Observable.<List<String>>empty().doOnCompleted(()->pagecontrol.onCompleted());
                }
            });
            pagecontrol.onNext("default");
            return ret2;
        });
        // Use this if you want to ensure work isn't done again
        ret = ret.cache();
        ret.subscribe(l -> System.out.println("Sub 1 : " + l));
        ret.subscribe(l -> System.out.println("Sub 2 : " + l));
        Thread.sleep(2000L);
    }
}
Run Code Online (Sandbox Code Playgroud)

进行了改进并进行了编辑。