Cra*_*ell 76 java reactive-programming rx-java rx-android
TL; DR
如何转换Task.whenAll(List<Task>)
成RxJava
?
我现有的代码使用Bolts构建异步任务列表,并等待所有这些任务完成后再执行其他步骤.从本质上讲,它构建一个List<Task>
并返回一个单独的Task
,当列表中的所有任务完成时,按照Bolts站点上的示例标记为已完成.
我期待,以取代Bolts
与RxJava
和我假设建立的异步任务列表(在事先不知道大小)和包装他们都到一个单一的这种方法Observable
是可行的,但我不知道怎么办.
我试着看merge
,zip
,concat
等...但不能去上工作List<Observable>
,我会被建立,因为他们似乎都面向工作的只有两个Observables
,如果我理解正确的文档在一个时间.
我正在努力学习RxJava
并且仍然是新手,所以请原谅我,如果这是一个明显的问题,或者在某个地方的文档中解释过; 我试过搜索.任何帮助将非常感激.
MyD*_*Tom 72
您可以使用flatMap
以防万一你有动态任务组合.像这样的东西:
public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
return Observable.from(tasks)
//execute in parallel
.flatMap(task -> task.observeOn(Schedulers.computation()))
//wait, until all task are executed
//be aware, all your observable should emit onComplemete event
//otherwise you will wait forever
.toList()
//could implement more intelligent logic. eg. check that everything is successful
.map(results -> true);
}
Run Code Online (Sandbox Code Playgroud)
注意:我真的不知道您对错误处理的要求.例如,如果只有一个任务失败,该怎么办.我认为你应该验证这种情况.
Mal*_*alt 62
听起来你正在寻找Zip运营商.
有几种不同的使用方法,让我们看一个例子.假设我们有一些不同类型的简单可观察量:
Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);
Run Code Online (Sandbox Code Playgroud)
等待它们的最简单方法是这样的:
Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));
Run Code Online (Sandbox Code Playgroud)
请注意,在zip函数中,参数具有与要压缩的可观察对象的类型相对应的具体类型.
也可以直接压缩可观察列表:
List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);
Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
Run Code Online (Sandbox Code Playgroud)
...或将列表包装成Observable<Observable<?>>
:
Observable<Observable<?>> obsObs = Observable.from(obsList);
Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
Run Code Online (Sandbox Code Playgroud)
但是,在这两种情况下,zip函数只能接受单个Object[]
参数,因为列表中的observable类型事先不知道,也不知道它们的编号.这意味着zip函数必须检查参数的数量并相应地转换它们.
无论如何,所有上述示例最终都将打印出来 1 Blah true
编辑:使用Zip时,请确保Observables
所有拉链都发出相同数量的项目.在上面的例子中,所有三个可观察者都发出了一个项目.如果我们要把它们改成这样的东西:
Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items
Run Code Online (Sandbox Code Playgroud)
然后1, Blah, True
,2, Hello, True
将是传递到zip函数的唯一项目.3
由于其他可观测量已完成,因此该项目永远不会被压缩.
eis*_*eis 12
在提出的建议中,zip()实际上将可观察的结果相互结合,这可能是也可能不是想要的,但在问题中没有被问到.在这个问题中,所有需要的是每个操作的执行,一个接一个或并行(没有指定,但链接的Bolts示例是关于并行执行).此外,当任何可观察量完成时,zip()将立即完成,因此它违反了要求.
对于Observables的并行执行,另一个答案中提供的 flatMap()很好,但merge()会更直接.请注意,如果您推迟退出直到所有可观察对象都已完成,则合并将在任何Observable的错误时退出,您应该查看mergeDelayError().
对于一个一个,我认为应该使用Observable.concat()静态方法.它的javadoc如下:
concat(java.lang.Iterable> sequences)将一个Observable的Iterable展平为一个Observable,一个接一个地展开,而不交错它们
如果你不想要并行执行,这听起来就像你所追求的那样.
此外,如果您只对完成任务感兴趣,而不是返回值,则应该查看Completable而不是Observable.
TLDR:对于完成任务和oncompletion事件的逐个执行,我认为Completable.concat()最适合.对于并行执行,Completable.merge()或Completable.mergeDelayError()听起来像解决方案.前一个将立即停止任何可完成的任何错误,后者将执行所有错误,即使其中一个有错误,然后才报告错误.