Ben*_*ith 4 java pagination reactive-programming observable rx-java
我有一个非常标准的 API 分页问题,您可以通过一些简单的递归来处理。这是一个虚构的例子:
public Observable<List<Result>> scan() {
return scanPage(Optional.empty(), ImmutableList.of());
}
private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
return this.scanner.scan(startKey, LIMIT)
.flatMap(page -> {
if (!page.getLastKey().isPresent()) {
return Observable.just(results);
}
return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
.addAll(results)
.addAll(page.getResults())
.build()
);
});
}
Run Code Online (Sandbox Code Playgroud)
但这显然会创建一个庞大的调用堆栈。如何强制执行此操作但维护 Observable 流?
这是一个命令式阻塞示例:
public List<Result> scan() {
Optional<String> startKey = Optional.empty();
final ImmutableList.Builder<Result> results = ImmutableList.builder();
do {
final Page page = this.scanner.scan(startKey);
startKey = page.getLastKey();
results.addAll(page.getResults());
} while (startKey.isPresent());
return results.build();
}
Run Code Online (Sandbox Code Playgroud)
JohnWowUs 的回答很好,帮助我了解如何有效地避免递归,但有一些点我仍然感到困惑,所以我发布了我的调整版本。
概括:
Single.Flowable来流式传输页面中包含的每个项目。这意味着我们函数的调用者不需要知道各个页面,只需收集包含的项目。BehaviorProcessor从第一页开始,如果下一页可用,则在我们检查当前页面后获取每个后续页面。processor.onNext(int)开始下一次迭代。import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.processors.BehaviorProcessor;
public class Pagination {
// Fetch all pages and return the items contained in those pages, using the provided page fetcher function
public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) {
// Processor issues page indices
BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0);
// When an index number is issued, fetch the corresponding page
return processor.concatMap(index -> fetchPage.apply(index).toFlowable())
// when returning the page, update the processor to get the next page (or stop)
.doOnNext(page -> {
if (page.hasNext()) {
processor.onNext(page.getNextPageIndex());
} else {
processor.onComplete();
}
})
.concatMapIterable(Page::getElements);
}
public static void main(String[] args) {
fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println);
}
// A function to fetch a page of our paged data
private static Single<Page<String>> examplePageFetcher(int index) {
return Single.just(pages.get(index));
}
// Create some paged data
private static ArrayList<Page<String>> pages = new ArrayList<>(3);
static {
pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1)));
pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2)));
pages.add(new Page<>(Arrays.asList("five"), Optional.empty()));
}
static class Page<T> {
private List<T> elements;
private Optional<Integer> nextPageIndex;
public Page(List<T> elements, Optional<Integer> nextPageIndex) {
this.elements = elements;
this.nextPageIndex = nextPageIndex;
}
public List<T> getElements() {
return elements;
}
public int getNextPageIndex() {
return nextPageIndex.get();
}
public boolean hasNext() {
return nextPageIndex.isPresent();
}
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
one
two
three
four
five
Run Code Online (Sandbox Code Playgroud)
这不是最优雅的解决方案,但您可以使用主题和副作用。请参阅下面的玩具示例
import rx.Observable;
import rx.Subscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import rx.subjects.*;
public class Pagination {
static HashMap<String,ArrayList<String>> pages = new HashMap<String,ArrayList<String>>();
public static void main(String[] args) throws InterruptedException {
pages.put("default", new ArrayList<String>());
pages.put("2", new ArrayList<String>());
pages.put("3", new ArrayList<String>());
pages.put("4", new ArrayList<String>());
pages.get("default").add("2");
pages.get("default").add("Maths");
pages.get("default").add("Chemistry");
pages.get("2").add("3");
pages.get("2").add("Physics");
pages.get("2").add("Biology");
pages.get("3").add("4");
pages.get("3").add("Art");
pages.get("4").add("");
pages.get("4").add("Geography");
Observable<List<String>> ret = Observable.defer(() ->
{
System.out.println("Building Observable");
ReplaySubject<String> pagecontrol = ReplaySubject.<String>create(1);
Observable<List<String>> ret2 = pagecontrol.asObservable().concatMap(aKey ->
{
if (!aKey.equals("")) {
return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0)));
} else {
return Observable.<List<String>>empty().doOnCompleted(()->pagecontrol.onCompleted());
}
});
pagecontrol.onNext("default");
return ret2;
});
// Use this if you want to ensure work isn't done again
ret = ret.cache();
ret.subscribe(l -> System.out.println("Sub 1 : " + l));
ret.subscribe(l -> System.out.println("Sub 2 : " + l));
Thread.sleep(2000L);
}
}
Run Code Online (Sandbox Code Playgroud)
进行了改进并进行了编辑。