如何在RxJava2的链中批量处理Flowable输出

Ang*_*Koh 0 rx-java rx-java2

如何在RxJava2中链接要批量处理的流程。下面的流程图是我想要实现的。

Flowable#1          Flowable#2 (process every 10)
==============     ================================
callServer(p1) ->
      :        ->  saveToDatabase(List<r1 to r10>)
callServer(p20)->  saveToDatabase(List<r11 to r20>)
callServer(p21)->      :
      :                :
callServer(p35)->  saveToDatabase(List<r31 to r35>) //the remainder
Run Code Online (Sandbox Code Playgroud)

当前,我要等待所有结果返回,然后再保存到数据库中。

Flowable.fromIterable(paramList)
    .map(p -> callServer(p)) 
    //wait for the return a map of ALL the results r  
    //how to chain it such that saveToDatabase process after 'n' results 
    .toList()  
    .flatmap(listOfR -> saveToDatabase(listOfR); 
Run Code Online (Sandbox Code Playgroud)

我如何做到在每个“ n”个结果之后调用saveToDatabase而不是等待所有结果完成?

yos*_*riz 5

使用带有n作为参数的buffer()运算符,buffer将从源中收集n个项目,Observable并发出n个项目的List。
因此,您可以一次处理n个项目,并将它们保存到数据库中