May*_*ies 21 java android reactive-programming rx-java rx-android
我有两个Apis.Api 1给了我一个项目列表,Api 2给了我更多关于我从Api获得的每个项目的详细信息.到目前为止,我解决它的方式导致性能不佳.
在Retrofit和RxJava的帮助下,快速,快速地解决了这个问题.
在片刻我的解决方案看起来像这样:
第1步:Single<ArrayList<Information>>从Api 1 执行Retrofit .
第2步:我遍历这些项目并向Api 2发出请求.
第3步:改造退货按顺序执行Single<ExtendedInformation>每个项目
步骤4:在完成Api 2的所有调用完成后,我为组合信息和扩展信息的所有项创建一个新对象.
public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};
for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}
public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}
@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}
public interface RatingRequestListener {
void onDownloadFinished(Information information, ExtendedInformation extendedInformation);
}
Run Code Online (Sandbox Code Playgroud)
Bri*_*ice 27
tl; dr使用concatMapEager或者flatMap异步或在调度程序上执行子调用.
很长的故事
我不是Android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2).
如果我得到正确的图片,所需的流程是:
some query param
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param
Run Code Online (Sandbox Code Playgroud)
假设Retrofit为客户生成了
interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}
interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
Run Code Online (Sandbox Code Playgroud)
如果项目的顺序不重要,则可以flatMap仅使用:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)
但只有配置了改装构建器
使用异步适配器(调用将在okhttp内部执行程序中排队).我个人认为这不是一个好主意,因为你无法控制这个执行者.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
Run Code Online (Sandbox Code Playgroud)或者使用基于调度程序的适配器(将在RxJava调度程序上调度调用).这是我的首选方案,因为您明确选择使用哪个调度程序,它很可能是IO调度程序,但您可以自由尝试不同的调度程序.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
Run Code Online (Sandbox Code Playgroud)原因是flatMap将订阅由其创建的每个observable api2.extendedInfo(...)并将其合并到生成的observable中.因此,结果将按照收到的顺序显示.
如果改装客户端未设置为异步或设置为在调度程序上运行,则可以设置一个:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)
这个结构几乎与之前的一个execpts相同,它在本地指示每个api2.extendedInfo应该运行的调度程序.
可以调整maxConcurrency参数flatMap以控制您想要同时执行的请求数.虽然我对此要谨慎,但您不希望同时运行所有查询.通常默认值maxConcurrency足够好(128).
现在如果原始查询的顺序很重要.concatMap通常是运算符flatMap按顺序执行相同的操作,但顺序运行,如果代码需要等待执行所有子查询,则结果会很慢.该解决方案更进了一步concatMapEager,这个将按顺序订阅observable,并根据需要缓冲结果.
假设改造客户端是异步的或在特定的调度程序上运行:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)
或者如果必须在本地设置调度程序:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
Run Code Online (Sandbox Code Playgroud)
也可以在此运算符中调整并发性.
此外,如果Api返回Flowable,则可以.parallel在RxJava 2.1.7中使用目前仍处于测试阶段的Api.但是后来结果不合适而且我不知道一种方式(但是?)订购它们而不进行排序.
api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>
Run Code Online (Sandbox Code Playgroud)
该flatMap运营商的设计,以满足这些类型的工作流程。
我将用一个简单的五步示例来概述大致的笔触。希望您可以轻松地在代码中重构相同的原则:
@Test fun flatMapExample() {
// (1) constructing a fake stream that emits a list of values
Observable.just(listOf(1, 2, 3, 4, 5))
// (2) convert our List emission into a stream of its constituent values
.flatMap { numbers -> Observable.fromIterable(numbers) }
// (3) subsequently convert each individual value emission into an Observable of some
// newly calculated type
.flatMap { number ->
when(number) {
1 -> Observable.just("A1")
2 -> Observable.just("B2")
3 -> Observable.just("C3")
4 -> Observable.just("D4")
5 -> Observable.just("E5")
else -> throw RuntimeException("Unexpected value for number [$number]")
}
}
// (4) collect all the final emissions into a list
.toList()
.subscribeBy(
onSuccess = {
// (5) handle all the combined results (in list form) here
println("## onNext($it)")
},
onError = { error ->
println("## onError(${error.message})")
}
)
}
Run Code Online (Sandbox Code Playgroud)
(顺便说一句,如果排放的顺序很重要,请考虑使用concatMap)。
我希望有帮助。