RXJS 如何在另一个中使用一个 observable 的结果(然后将这两个结果一起处理)

Jam*_*mes 4 rxjs angular rxjs-pipeable-operators rxjs6

使用 RxJs 时的一个非常常见的问题似乎是希望一个或多个 observables 的结果然后在后续的观察中使用它们。

例如在伪代码中(这不是故意的 rx 或有效的 js 语法)

var someResult = $observable-A; // wait to complete
var finalResult = $observable-B(someResult.aValueINeed);
Run Code Online (Sandbox Code Playgroud)

这可以以一种丑陋的方式完成,您可以订阅两者并在另一个内部调用一个..但是,这非常混乱并且不能为您提供很大的灵活性。

例如(真正的语法)

$observable-A.subscribe(resultA => { 
    $observable-B(resultA.aValueINeed)
        .subscribe(resultB => { 
            console.log('After everything completes: ', resultB); 
        }
}
Run Code Online (Sandbox Code Playgroud)

这也意味着当您完成两个 observable 时,没有其他任何东西可以轻松消耗此流。

我的特定用例需要以下内容:

  1. 对 $observable-A 的初始调用,因为它不需要其他任何东西(这是一个基本的 http GET 调用)
  2. 对另一个服务的 http GET 调用需要来自 $observable-A 的数据,该服务返回 $observable-B
  3. 使用两个可观察结果 (A + B) 为我的服务(在我的情况下,Angular 服务)创建一个对象以返回单个列表。

我还需要能够在我的服务中订阅这个功能,这就是为什么使用上面的订阅方法对我不起作用。

Jam*_*mes 10

Snorre Danielsen 解决了这个问题,这个解决方案完全归功于他。我建议看看这个

rxjs 操作符的伟大之处在于它们的互操作性。您可以混合和匹配几乎所有内容以获得所需的结果。

简而言之。这个问题的代码答案如下,我将进一步解释。

$observable-A.pipe(
    mergeMap(resultA => {
        return combineLatest(
            of(resultA),
            $observable-B(resultA)
        )
    }),
    map(([resultA, resultB]) => {
        // you can do anything with both results here
        // we can also subscribe to this any number of times because we are doing all the processing with
        // pipes and not completing the observable
    }
)
Run Code Online (Sandbox Code Playgroud)

mergeMap(或flatMap哪个是别名)将 observable 作为输入并对其进行投影(就像常规的 map 函数一样)。这意味着我们可以使用它的结果作为 的输入$observable-B。现在这非常适合您实际上只想返回第二个可观察结果。例如

$observable-A.pipe(
    mergeMap(resultA => $observable-B(resultA)),
    map((resultB) => {
        // resultA doesn't exist here and we can only manipulate resultB
    }
)

Run Code Online (Sandbox Code Playgroud)

现在,回到主要解决方案。combineLatest是这里的关键。它允许mergeMap本质上“等待”内部 observable ( $observable-B) 完成。没有它,您只会向 中的下一个 rxjs 运算符返回一个 observable pipe,这不是我们想要的(因为您希望处理 a 中的常规运算符函数pipe)。

因此,我们可以将这些新合并的 observable 带入链的下一部分并一起使用。我们使用该of()函数重新创建resultA,因为combineLatest只将 observables 作为输入,并且resultA在这种状态下是一个完整的 observable 。

提示:看一下map(([resultA, resultB]). 我们使用这种表示法来引用我们的变量而不是标准的原因map(results)。这样我们就可以直接引用它们而无需使用results[0]forresultAresults[1]for resultB。以这种方式阅读起来容易得多。


我希望这可以帮助人们解决问题,因为我还没有找到涵盖这种情况的完整 SO 答案。编辑很受欢迎,因为它很复杂,我相信它不是一个完美的答案。

  • 如果两个可观察量都来自 HTTP 请求,则与“mergeMap”和“combineLatest”相比,“switchMap”和“forkJoin”将更合适。合并多个请求的结果并不常见。“combineLatest”和“zip”运算符具有非常具体的用例,其中您需要在源可观察量发出后立即获取值(前提是所有源至少已发出一次)。 (2认同)