如何使用RxJava处理分页?

joh*_*guy 17 rx-java

我正在考虑转换我的Android应用程序以使用Rxjava进行网络请求.我目前访问的Web服务类似于:

getUsersByKeyword(String query, int limit, int offset)
Run Code Online (Sandbox Code Playgroud)

据我了解,Observables是一个"推"而不是一个"拉"接口.所以这就是我理解要解决的问题:

  • app注册服务,获取Observable进行查询
  • 结果被推送到应用程序
  • 应用处理结果
  • 当app想要更多结果时......?

这就是我崩溃的地方.以前我只是问网络服务我想要什么,再次使用偏移量进行查询.但在这种情况下,将涉及创建另一个Observable并订阅它,有点打败这一点.

我应该如何在我的应用程序中处理分页?(这是一个Android应用程序,但我不认为这是相关的).

xox*_*_89 9

这是硬摇滚!)所以我们有网络请求:
getUsersByKeyword(String query, int limit, int offset)
并且此请求返回例如
List< Result >
如果我们使用RetroFit进行网络请求将会看起来:
Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)
结果我们想Result从服务器获取所有内容.
所以它看起来像这样

int page = 50;
int limit = page;
Observable
                .range(0, Integer.MAX_VALUE - 1)
                .concatMap(new Func1<Integer, Observable<List<Result>>>() {
                    @Override
                    public Observable<List<Result>> call(Integer integer) {
                        return getUsersByKeyword(query, integer * page, limit);
                    }
                })
                .takeWhile(new Func1<List<Result>, Boolean>() {
                    @Override
                    public Boolean call(List<Result> results) {
                        return !results.isEmpty();
                    }
                })
                .scan(new Func2< List<Result>, List<Result>, List<Result>>() {
                    @Override
                    public List<Result> call(List<Result> results, List< Result> results2) {
                        List<Result> list = new ArrayList<>();
                        list.addAll(results);
                        list.addAll(results2);
                        return list;
                    }
                })
                .last()
                .subscribe(new Subscriber<List<Result>>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(List<Results> results) {
                    }
                });
Run Code Online (Sandbox Code Playgroud)

代码被测试了!


lop*_*par 5

所以,如果这是单向分页,这是你可能会尝试的模式.此代码尚未运行或编译,但我试图过度注释以解释发生了什么.

private static final int LIMIT = 50;

// Given: Returns a stream of dummy event objects telling us when
// to grab the next page. This could be from a click or wherever.
Observable<NextPageEvent> getNextPageEvents();  

// Given:
// The search query keywords. Each emission here means a new top-level
// request;
Observable<String> queries;

queries.switchMap((query) -> getNextPageEvents()
        // Ignore 'next page' pokes when unable to take them.
        .onBackPressureDrop()
        // Seed with the first page.
        .startWith(new NextPageEvent())
        // Increment the page number on each request.
        .scan(0, (page, event) -> page + 1) 
        // Request the page from the server.
        .concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page)
                // Unroll Observable<List<User> into Observable<User>
                .concatMap((userList) -> Observable.from(userList))
                .retryWhen(/** Insert your favorite retry logic here. */))
        // Only process new page requests sequentially.
        .scheduleOn(Schedulers.trampoline())
        // Trampoline schedules on the 'current thread', so we make sure that's
        // a background IO thread.
        .scheduleOn(Schedulers.io());
Run Code Online (Sandbox Code Playgroud)

这应该让"下一页事件"信号每次都触发下一页数据的加载,并且如果遇到加载错误的页面,则不跳页.如果收到新的搜索查询,它也会在顶层完全重新启动.如果我(或其他人?)有时间,我想检查我对蹦床和背压的假设,并确保它阻止任何在加载时过早地获取下一页的企图.


And*_*ltz 2

我已经做到了这一点,实际上并不难。

该方法是在firstRequestsObservable 中对每个第一个请求(偏移量0)进行建模。为了简单起见,您可以将其设置为 PublishSubject,您可以在其中调用onNext()以提供下一个请求,但是有更智能的非Subject 方法可以做到这一点(例如,如果单击按钮时完成请求,则 requestObservable 是通过一些运算符映射的 clickObservable)。

一旦就位firstRequestsObservable,您可以responseObservable通过 flatMapping 等等firstRequestsObservable,来进行服务调用。

现在技巧来了:创建另一个subsequentRequestsObservable从 映射的可观察对象responseObservable,增加偏移量(为此,最好在响应数据中包含原始请求的偏移量)。一旦引入了这个可观察量,您现在必须更改 的定义,responseObservable以便它也依赖于subsequentRequestsObservable。然后你会得到一个像这样的循环依赖:

首先RequestsObservable -> responseObservable ->后续RequestsObservable ->responseObservable ->后续RequestsObservable -> ...

为了打破这个循环,您可能希望filter在 的定义中包含一个运算符subsequentRequestsObservable,过滤掉偏移量超过“总”限制的那些情况。循环依赖还意味着您需要将其中之一作为主题,否则将无法声明可观察量。我建议responseObservable成为那个主题。

因此,总而言之,您首先将responseObservable初始化为Subject,然后声明firstRequestsObservable,然后声明后续RequestsObservable作为通过一些运算符传递responseObservable的结果。然后可以使用“输入”responseObservable onNext

  • @AndréStaltz你是对的,准备一个例子需要一些时间,但如果没有这个例子,你的想法就毫无用处,因为我们大多数人都无法理解你的概念。这可能就是为什么你的答案不被接受,唯一的+1来自我。 (5认同)