RxJava组合请求序列

May*_*ies 21 java android reactive-programming rx-java rx-android

问题

我有两个Apis.Api 1给了我一个项目列表,Api 2给了我更多关于我从Api获得的每个项目的详细信息.到目前为止,我解决它的方式导致性能不佳.

问题

在Retrofit和RxJava的帮助下,快速,快速地解决了这个问题.

我的方法

在片刻我的解决方案看起来像这样:

第1步:Single<ArrayList<Information>>从Api 1 执行Retrofit .

第2步:我遍历这些项目并向Api 2发出请求.

第3步:改造退货按顺序执行Single<ExtendedInformation>每个项目

步骤4:在完成Api 2的所有调用完成后,我为组合信息和扩展信息的所有项创建一个新对象.

我的守则

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }
Run Code Online (Sandbox Code Playgroud)

Bri*_*ice 27

tl; dr使用concatMapEager或者flatMap异步或在调度程序上执行子调用.


很长的故事

我不是Android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2).

如果我得到正确的图片,所需的流程是:

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param
Run Code Online (Sandbox Code Playgroud)

假设Retrofit为客户生成了

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
Run Code Online (Sandbox Code Playgroud)

如果项目的顺序不重要,则可以flatMap仅使用:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)
Run Code Online (Sandbox Code Playgroud)

只有配置了改装构建器

  • 使用异步适配器(调用将在okhttp内部执行程序中排队).我个人认为这不是一个好主意,因为你无法控制这个执行者.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
    
    Run Code Online (Sandbox Code Playgroud)
  • 或者使用基于调度程序的适配器(将在RxJava调度程序上调度调用).这是我的首选方案,因为您明确选择使用哪个调度程序,它很可能是IO调度程序,但您可以自由尝试不同的调度程序.

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    
    Run Code Online (Sandbox Code Playgroud)

原因是flatMap将订阅由其创建的每个observable api2.extendedInfo(...)并将其合并到生成的observable中.因此,结果将按照收到的顺序显示.

如果改装客户端设置为异步或设置为在调度程序上运行,则可以设置一个:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)
Run Code Online (Sandbox Code Playgroud)

这个结构几乎与之前的一个execpts相同,它在本地指示每个api2.extendedInfo应该运行的调度程序.

可以调整maxConcurrency参数flatMap以控制您想要同时执行的请求数.虽然我对此要谨慎,但您不希望同时运行所有查询.通常默认值maxConcurrency足够好(128).

现在如果原始查询的顺序很重要.concatMap通常是运算符flatMap按顺序执行相同的操作,但顺序运行,如果代码需要等待执行所有子查询,则结果会很慢.该解决方案更进了一步concatMapEager,这个将按顺序订阅observable,并根据需要缓冲结果.

假设改造客户端是异步的或在特定的调度程序上运行:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()))
    .subscribe(...)
Run Code Online (Sandbox Code Playgroud)

或者如果必须在本地设置调度程序:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)
Run Code Online (Sandbox Code Playgroud)

也可以在此运算符中调整并发性.


此外,如果Api返回Flowable,则可以.parallel在RxJava 2.1.7中使用目前仍处于测试阶段的Api.但是后来结果不合适而且我不知道一种方式(但是?)订购它们而不进行排序.

api.items(queryParam) // Flowable<Item>
   .parallel(10)
   .runOn(Schedulers.io())
   .map(item -> api2.extendedInfo(item.id()))
   .sequential();     // Flowable<ItemExtended>
Run Code Online (Sandbox Code Playgroud)


hom*_*man 7

flatMap运营商的设计,以满足这些类型的工作流程。

我将用一个简单的五步示例来概述大致的笔触。希望您可以轻松地在代码中重构相同的原则:

@Test fun flatMapExample() {
    // (1) constructing a fake stream that emits a list of values
    Observable.just(listOf(1, 2, 3, 4, 5))
            // (2) convert our List emission into a stream of its constituent values 
            .flatMap { numbers -> Observable.fromIterable(numbers) }
            // (3) subsequently convert each individual value emission into an Observable of some 
            //     newly calculated type
            .flatMap { number ->
                when(number) {
                       1 -> Observable.just("A1")
                       2 -> Observable.just("B2")
                       3 -> Observable.just("C3")
                       4 -> Observable.just("D4")
                       5 -> Observable.just("E5")
                    else -> throw RuntimeException("Unexpected value for number [$number]")
                }
            }
            // (4) collect all the final emissions into a list
            .toList()
            .subscribeBy(
                    onSuccess = {
                        // (5) handle all the combined results (in list form) here
                        println("## onNext($it)")
                    },
                    onError = { error ->
                        println("## onError(${error.message})")
                    }
            )
}
Run Code Online (Sandbox Code Playgroud)

(顺便说一句,如果排放的顺序很重要,请考虑使用concatMap)。

我希望有帮助。