RxJava:如何用依赖关系组合多个Observable并在最后收集所有结果?

Ste*_*let 20 java functional-programming reactive-programming rx-java

我正在学习RxJava,并且,作为我的第一个实验,尝试在此代码中的第一个run()方法中重写代码(在Netflix的博客中引用,作为RxJava可以帮助解决的问题)使用RxJava改进其异步性,即它不会f1.get()在继续执行其余代码之前,请等待第一个Future()的结果.

f3取决于f1.我看到如何处理这个,flatMap似乎做了诀窍:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });
Run Code Online (Sandbox Code Playgroud)

接下来,f4f5依靠f2.我有这个:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });
Run Code Online (Sandbox Code Playgroud)

这开始变得奇怪(merge他们可能不是我想要的......)但是允许我在最后这样做,而不是我想要的:

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});
Run Code Online (Sandbox Code Playgroud)

这给了我:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100
Run Code Online (Sandbox Code Playgroud)

这是所有数字,但不幸的是我在单独的调用中得到结果,所以我不能完全替换原始代码中的最终println:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));
Run Code Online (Sandbox Code Playgroud)

我不明白如何在同一行上访问这两个返回值.我想可能有一些功能性编程我在这里缺少.我怎样才能做到这一点?谢谢.

use*_*713 19

看起来你真正需要的是对如何使用RX的更多鼓励和观点.我建议你阅读更多文档和大理石图(我知道它们并不总是有用).我还建议调查lift()函数和运算符.

  • 可观察的整个点是将数据流和数据操作连接成单个对象
  • 调用的点map,flatMapfilter有操纵您的数据流中的数据
  • 合并点是组合数据流
  • 运算符的要点是允许您中断稳定的可观察流并在数据流上定义自己的操作.例如,我编码了一个移动平均算子.这总结了n double一个Observable of doubles来返回一系列移动平均线.代码字面上看起来像这样

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

你会感到宽慰的是,许多你认为理所当然的过滤方法都lift()在幕后.

照这样说; 合并多个依赖项所需的全部是:

  • 使用map或将所有传入数据更改为标准数据类型flatMap
  • 将标准数据类型合并到流中
  • 如果一个对象需要在另一个对象上等待,或者您需要在流中订购数据,则使用自定义运算符.注意:这种方法会减慢流的速度
  • 用于列出或订阅以收集所有这些数据


Ste*_*let 7

编辑:有人将以下文本转换为答案,我将其作为问题的编辑添加到答案中,我很欣赏,并且理解可能是正确的SO要做的事情,但我不认为这是一个答案,因为它显然不是正确的方法.我不会使用此代码,也不会建议任何人复制它.其他/更好的解决方案和评论欢迎!


我能用以下方法解决这个问题.我没有意识到你可以flatMap不止一次观察,我认为结果只能消耗一次.所以我只是flatMapf2Observable两次(对不起,我在我的原始帖子中重命名代码中的一些东西),然后zip在所有的Observables上,然后订阅它.这Mapzip聚集的值是因为类型杂耍的不希望的.其他/更好的解决方案和评论欢迎!完整的代码是一个要点可见.谢谢.

Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
            return Observable.from(f4);
        }       
    });     

Observable<Integer> f5Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
            return Observable.from(f5);
        }       
    });     

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }       
}).subscribe(new Action1<Map<String, String>>() {
    @Override
    public void call(Map<String, String> map) {
        System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
    }       
});     
Run Code Online (Sandbox Code Playgroud)

这会产生我想要的输出:

responseB_responseA => 714000
Run Code Online (Sandbox Code Playgroud)