你将如何从函数返回 Observable<T>

sre*_*moh 7 observable rxjs typescript angular

我有这个功能:

  getData(): Observable<ServerResponse> {
    const data = {} as ServerResponse;

    data.client = this.getClientData().pipe(
      map(response =>
        response.map(x => {
          return x.data.client;
        }),
      ),
    );

    data.otherData = this.otherData().pipe(
      map(response =>
        response.map(x => {
          return this.groupByTimePeriod(x.data.other, 'day', 'week');
        }),
      ),
    );
  }
Run Code Online (Sandbox Code Playgroud)

该函数需要返回data分配给函数内部的所有属性。你会怎么做?

如果我只是返回data它就不起作用this.getClientData()并且this.otherData()尚未完成。

Dea*_*ean 8

好吧,您在这里有多个问题/问题。我将从最简单的开始。你如何从一个函数/对象中得到一个 observable?答案是通过观察到

return of(data);
Run Code Online (Sandbox Code Playgroud)

但是你避开了一个更大的问题,那就是:你如何推迟返回数据直到子可观察对象发出它们的值?您正在寻找forkJoin。通过文档:

forkJoin将等待所有传递的 Observables 完成,然后它会发出一个数组,其中包含来自相应 Observables 的最后一个值。因此,如果您将nObservables传递给运算符,结果数组将具有n值,其中第一个值是第一个 Observable 发出的最后一个值,第二个值是第二个 Observable 发出的最后一个值,依此类推。这意味着forkJoin不会发出多次,之后会完成。

您还有其他几个问题。例如,您永远不会订阅this.getClientData()this.otherData()。Observable 被懒惰地执行。observable 中的代码将不会执行,直到有人订阅它。从文档

里面的代码Observable.create(function subscribe(observer) {...})代表了一个“Observable execution”,这是一个惰性计算,它只发生在每个订阅了 Observer 的 Observer 上

看起来好像您正在pipe/map尝试在data对象上设置属性。但是您永远不会设置data.clientor data.other,因此它们将始终为空。

因此,将它们放在一起,这就是您的代码可能的样子,模拟服务器延迟以显示forkJoin等待两个可观察对象的完成:

import { Injectable } from '@angular/core';
import { Observable, of, forkJoin } from 'rxjs';
import { delay } from 'rxjs/operators';

@Injectable({
    providedIn: 'root'
})
export class TestService {
    getData(): Observable<ServerResponse> {
        const allOperations = forkJoin(
            this.getClientData(),
            this.getOtherData()
        );

        const observable = Observable.create(function subscribe(observer) {
            // Wait until all operations have completed
            allOperations.subscribe(([clientData, otherData]) => {
                const data = new ServerResponse;
                // Update your ServerReponse with client and other data
                data.otherdata = otherData.other;
                data.client = clientData.client;
                // Now that data is 100% populated, emit to anything subscribed to getData().
                observer.next(data);
                observer.complete();
            });

        });
        // We return the observable, with the code above to be executed only once it is subscribed to
        return observable;
    }

    getClientData() : Observable<any> {
        return of({ client: 'Client 1' });
    }
    getOtherData(): Observable<any> {
        // Fake server latency
        return of({ other: 'Other data that takes a while to return from server...' })
            .pipe(delay(2000));
    }
}

export class ServerResponse {
    client: string;
    otherdata: string;
}
Run Code Online (Sandbox Code Playgroud)

如果您调用getData() 并订阅 observable,您将看到它forkJoin按预期工作,我们必须等待 2 秒才能让子 observable 完成并且我们的 observable 发出一个值:

this.testService.getData().subscribe(data => {
  console.log(data);
});
Run Code Online (Sandbox Code Playgroud)

看来您可能是 RxJS / 异步编程的新手。我建议您有机会时阅读RxJs的优秀介绍。一开始可能会很棘手,但随着练习,这将成为第二天性。