合并一个Observable列表并等待所有完成

Cra*_*ell 76 java reactive-programming rx-java rx-android

TL; DR 如何转换Task.whenAll(List<Task>)RxJava

我现有的代码使用Bolts构建异步任务列表,并等待所有这些任务完成后再执行其他步骤.从本质上讲,它构建一个List<Task>并返回一个单独的Task,当列表中的所有任务完成时,按照Bolts站点上的示例标记为已完成.

我期待,以取代BoltsRxJava和我假设建立的异步任务列表(在事先不知道大小)和包装他们都到一个单一的这种方法Observable是可行的,但我不知道怎么办.

我试着看merge,zip,concat等...但不能去上工作List<Observable>,我会被建立,因为他们似乎都面向工作的只有两个Observables,如果我理解正确的文档在一个时间.

我正在努力学习RxJava并且仍然是新手,所以请原谅我,如果这是一个明显的问题,或者在某个地方的文档中解释过; 我试过搜索.任何帮助将非常感激.

MyD*_*Tom 72

您可以使用flatMap以防万一你有动态任务组合.像这样的东西:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplemete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}
Run Code Online (Sandbox Code Playgroud)

并行执行的另一个好例子

注意:我真的不知道您对错误处理的要求.例如,如果只有一个任务失败,该怎么办.我认为你应该验证这种情况.

  • 考虑到该问题表明"当列表中的所有任务完成"时,这应该是可接受的答案.一旦完成其中一项任务,"zip"就会通知完成,因此不适用. (13认同)
  • 这似乎不适用于RxJava2 (4认同)
  • @PoojaGaikwad使用lambda它更具可读性.只需将第一个lambda替换为`new Func1 <Observable <Boolean>,Observable <Boolean >>()...`,第二个替换为`new Func1 <List <Boolean>,Boolean>()` (3认同)

Mal*_*alt 62

听起来你正在寻找Zip运营商.

有几种不同的使用方法,让我们看一个例子.假设我们有一些不同类型的简单可观察量:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);
Run Code Online (Sandbox Code Playgroud)

等待它们的最简单方法是这样的:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));
Run Code Online (Sandbox Code Playgroud)

请注意,在zip函数中,参数具有与要压缩的可观察对象的类型相对应的具体类型.

也可以直接压缩可观察列表:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
Run Code Online (Sandbox Code Playgroud)

...或将列表包装成Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
Run Code Online (Sandbox Code Playgroud)

但是,在这两种情况下,zip函数只能接受单个Object[]参数,因为列表中的observable类型事先不知道,也不知道它们的编号.这意味着zip函数必须检查参数的数量并相应地转换它们.

无论如何,所有上述示例最终都将打印出来 1 Blah true

编辑:使用Zip时,请确保Observables所有拉链都发出相同数量的项目.在上面的例子中,所有三个可观察者都发出了一个项目.如果我们要把它们改成这样的东西:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items
Run Code Online (Sandbox Code Playgroud)

然后1, Blah, True,2, Hello, True将是传递到zip函数的唯一项目.3由于其他可观测量已完成,因此该项目永远不会被压缩.

  • 如果其中一个调用失败,这将不起作用.在这种情况下,所有呼叫都将丢失. (9认同)

eis*_*eis 12

在提出的建议中,zip()实际上将可观察的结果相互结合,这可能是也可能不是想要的,但在问题中没有被问到.在这个问题中,所有需要的是每个操作的执行,一个接一个或并行(没有指定,但链接的Bolts示例是关于并行执行).此外,当任何可观察量完成时,zip()将立即完成,因此它违反了要求.

对于Observables的并行执行,另一个答案中提供的 flatMap()很好,但merge()会更直接.请注意,如果您推迟退出直到所有可观察对象都已完成,则合并将在任何Observable的错误时退出,您应该查看mergeDelayError().

对于一个一个,我认为应该使用Observable.concat()静态方法.它的javadoc如下:

concat(java.lang.Iterable> sequences)将一个Observable的Iterable展平为一个Observable,一个接一个地展开,而不交错它们

如果你不想要并行执行,这听起来就像你所追求的那样.

此外,如果您只对完成任务感兴趣,而不是返回值,则应该查看Completable而不是Observable.

TLDR:对于完成任务和oncompletion事件的逐个执行,我认为Completable.concat()最适合.对于并行执行,Completable.merge()或Completable.mergeDelayError()听起来像解决方案.前一个将立即停止任何可完成的任何错误,后者将执行所有错误,即使其中一个有错误,然后才报告错误.