在恢复之前等待RxJs.Subscriptions完成

tan*_*pac 14 rxjs angular

在我的Angular 2应用程序中,我需要发出一系列的http请求.我有两个服务,A和B,每个服务都发出请求,A.get()B.get()从API获取数据并将它们存储在他们的服务中.这两者可以在同一时间被调用,但是我的第三个请求,doSomething()这取决于结果A.get()B.get().由于两个A.get()B.get()正在存储他们的反应当地的返回值,最终成为RxJs认购.像这样:

class A{
  public data;
  public get(){
    return api.call(params).subscribe((response)=>{ this.data = response.json();})
  }
}

class B{
  public data;
  public get(){
    return api.call(params).subscribe((response)=>{ this.data = response.json();})
  }
}
Run Code Online (Sandbox Code Playgroud)

我的组件看起来像这样:

class MyComponent{
  constructor(private a: A, private b: B){
    a.get();
    b.get();
    this.doSomething(a.data, b.data);
  }
  doSomething(aData, bData){
    ...
  }
}
Run Code Online (Sandbox Code Playgroud)

我的问题是doSomething()失败,因为a.get()b.get()完成了http请求.我需要一种方法来控制呼叫,doSomething()直到我的其他呼叫完成.我一直在搜索但是没有运气这个问题.RxJs文档提供了一些可以合并的方法,Observables但这不是我在这种情况下的方法.

Ale*_*nov 7

那么,你想要的就是这个:

class A {

    private data;

    public get(): Observable<any> {
        // this is a very primitive caching just to show concept
        if (this.data) {
            return Observable.of(this.data);
        }
        // In real life it would be something like this:
        //  let call = this.http.get(...).map(r => r.json())
        let call = Observable.of("some value A");
        return call.do(s => this.data = s);
    }

}

class B {

    private data;

    public get(): Observable<any> {
        if (this.data) {
            return Observable.of(this.data);
        }
        let call = Observable.of("some value B");
        return call.do(s => this.data = s);
    }

}

class MyComponent {
    constructor(private a: A, private b: B) {
        Observable
            .zip(this.a.get(), this.b.get(), (a: any, b: any) => { return { a: a, b: b } })
            .subscribe((r) => {
                this.doSomething(r.a, r.b);
            });
    }
    doSomething(aData, bData) {
        console.log("aData", aData);
        console.log("bData", bData);
    }
}
Run Code Online (Sandbox Code Playgroud)

这是对代码的修改.如您所见,无需订阅服务组件内的observable.甚至不需要在服务组件之外单独订阅它们.我们可以通过以某种方式组合可观察量直接在单个最终订阅()中获得最终结果.

UPDATE

do()和subscribe()之间有什么区别.

稍微简化(跳过热的observables),可观察的管道在你订阅它之前不会开始做任何事情.do()只是众多操作符中的一个,这些操作符被设计为在您的observable链中有一些"副作用",例如它可以在可观察的管道中间输出一些中间结果到控制台以进行调试.这是主要的区别.所以,你可以在subscribe()中获得一些东西而不用do(),但是如果没有subscribe(),你就不会得到do()中的任何东西.


and*_*rob 5

用于Observable.forkJoin通过将订阅移至组件来轻松解决您的问题。它需要一组冷观察值,订阅后,您会收到一组结果。像这样的东西:

Observable.forkJoin([ 
    a.get(),
    b.get()
]).subscribe(
    results => {
        doSomething(results[0], results[1]);
    },
    err => {
        //handle error
    }
);
Run Code Online (Sandbox Code Playgroud)

在两个请求都传入之前,不会调用订阅中的回调。

如果 或a.get()失败b.get(),将调用错误方法