我正在考虑转换我的Android应用程序以使用Rxjava进行网络请求.我目前访问的Web服务类似于:
getUsersByKeyword(String query, int limit, int offset)
Run Code Online (Sandbox Code Playgroud)
据我了解,Observables是一个"推"而不是一个"拉"接口.所以这就是我理解要解决的问题:
这就是我崩溃的地方.以前我只是问网络服务我想要什么,再次使用偏移量进行查询.但在这种情况下,将涉及创建另一个Observable并订阅它,有点打败这一点.
我应该如何在我的应用程序中处理分页?(这是一个Android应用程序,但我不认为这是相关的).
这是硬摇滚!)所以我们有网络请求:
getUsersByKeyword(String query, int limit, int offset)
并且此请求返回例如
List< Result >
如果我们使用RetroFit进行网络请求将会看起来:
Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)
结果我们想Result从服务器获取所有内容.
所以它看起来像这样
int page = 50;
int limit = page;
Observable
.range(0, Integer.MAX_VALUE - 1)
.concatMap(new Func1<Integer, Observable<List<Result>>>() {
@Override
public Observable<List<Result>> call(Integer integer) {
return getUsersByKeyword(query, integer * page, limit);
}
})
.takeWhile(new Func1<List<Result>, Boolean>() {
@Override
public Boolean call(List<Result> results) {
return !results.isEmpty();
}
})
.scan(new Func2< List<Result>, List<Result>, List<Result>>() {
@Override
public List<Result> call(List<Result> results, List< Result> results2) {
List<Result> list = new ArrayList<>();
list.addAll(results);
list.addAll(results2);
return list;
}
})
.last()
.subscribe(new Subscriber<List<Result>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Results> results) {
}
});
Run Code Online (Sandbox Code Playgroud)
代码被测试了!
所以,如果这是单向分页,这是你可能会尝试的模式.此代码尚未运行或编译,但我试图过度注释以解释发生了什么.
private static final int LIMIT = 50;
// Given: Returns a stream of dummy event objects telling us when
// to grab the next page. This could be from a click or wherever.
Observable<NextPageEvent> getNextPageEvents();
// Given:
// The search query keywords. Each emission here means a new top-level
// request;
Observable<String> queries;
queries.switchMap((query) -> getNextPageEvents()
// Ignore 'next page' pokes when unable to take them.
.onBackPressureDrop()
// Seed with the first page.
.startWith(new NextPageEvent())
// Increment the page number on each request.
.scan(0, (page, event) -> page + 1)
// Request the page from the server.
.concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page)
// Unroll Observable<List<User> into Observable<User>
.concatMap((userList) -> Observable.from(userList))
.retryWhen(/** Insert your favorite retry logic here. */))
// Only process new page requests sequentially.
.scheduleOn(Schedulers.trampoline())
// Trampoline schedules on the 'current thread', so we make sure that's
// a background IO thread.
.scheduleOn(Schedulers.io());
Run Code Online (Sandbox Code Playgroud)
这应该让"下一页事件"信号每次都触发下一页数据的加载,并且如果遇到加载错误的页面,则不跳页.如果收到新的搜索查询,它也会在顶层完全重新启动.如果我(或其他人?)有时间,我想检查我对蹦床和背压的假设,并确保它阻止任何在加载时过早地获取下一页的企图.
我已经做到了这一点,实际上并不难。
该方法是在firstRequestsObservable 中对每个第一个请求(偏移量0)进行建模。为了简单起见,您可以将其设置为 PublishSubject,您可以在其中调用onNext()以提供下一个请求,但是有更智能的非Subject 方法可以做到这一点(例如,如果单击按钮时完成请求,则 requestObservable 是通过一些运算符映射的 clickObservable)。
一旦就位firstRequestsObservable,您可以responseObservable通过 flatMapping 等等firstRequestsObservable,来进行服务调用。
现在技巧来了:创建另一个subsequentRequestsObservable从 映射的可观察对象responseObservable,增加偏移量(为此,最好在响应数据中包含原始请求的偏移量)。一旦引入了这个可观察量,您现在必须更改 的定义,responseObservable以便它也依赖于subsequentRequestsObservable。然后你会得到一个像这样的循环依赖:
首先RequestsObservable -> responseObservable ->后续RequestsObservable ->responseObservable ->后续RequestsObservable -> ...
为了打破这个循环,您可能希望filter在 的定义中包含一个运算符subsequentRequestsObservable,过滤掉偏移量超过“总”限制的那些情况。循环依赖还意味着您需要将其中之一作为主题,否则将无法声明可观察量。我建议responseObservable成为那个主题。
因此,总而言之,您首先将responseObservable初始化为Subject,然后声明firstRequestsObservable,然后声明后续RequestsObservable作为通过一些运算符传递responseObservable的结果。然后可以使用“输入”responseObservable onNext。
| 归档时间: |
|
| 查看次数: |
6298 次 |
| 最近记录: |