是什么触发了CombineLatest?

Ale*_*lii 6 observable rxjs combinelatest

我有一些可观察的东西。而且我需要知道哪个触发了订阅。

Observable.combineLatest(
      this.tournamentsService.getUpcoming(),
      this.favoriteService.getFavoriteTournaments(),
      this.teamsService.getTeamRanking(),
(tournament, favorite, team) => {
//what triggered combinelatest to run?
}).subscribe()
Run Code Online (Sandbox Code Playgroud)

小智 10

实现这一点的一种非常干净和“rx”的方式是使用时间戳运算符http://reactivex.io/documentation/operators/timestamp.html

示例代码

sourceObservable
  .pipe(
    timestamp(),  // wraps the source items in object with timestamp of emit
    combineLatest( otherObservable.pipe( timestamp() ), function( source, other ) {

      if( source.timestamp > other.timestamp ) {

        // source emitted and triggered combineLatest
        return source.value;
      }
      else {

        // other emitted and triggered combineLatest
        return other.value;
      }

    } ),
  )
Run Code Online (Sandbox Code Playgroud)

如果在combineLatest()按时间戳对它们进行排序时涉及两个以上的 observables将能够检测到哪个触发了combineLatest().


ols*_*lsn 8

简短的回答是:你不知道。您可以实施一些解决方法,但是这真的很难看,我建议重新考虑用例为什么需要它,也许您可​​以更改架构。还请记住,您的函数的第一次执行将是在所有三个 observable 都发出至少 1 个值之后。

无论如何 - 可能的解决方法可能是:

let trigger = "";
Observable.combineLatest(
      this.tournamentsService.getUpcoming().do(() => trigger = "tournament"),
      this.favoriteService.getFavoriteTournaments().do(() => trigger = "favTournament"),
      this.teamsService.getTeamRanking().do(() => trigger = "teamRanking"),
(tournament, favorite, team) => {
   console.log(`triggered by ${trigger}`);
}).subscribe();
Run Code Online (Sandbox Code Playgroud)

如果你想根据触发的 observable 执行特定的操作,你应该利用每个 observable 并将它们用作单独的触发器,切换到组合触发器,它可能会稍微多一些代码但它更干净,你不会最终以丑陋的 if/else、switch/case-mess 和一些 hacky 解决方法结束 -另外你甚至有机会使用async-pipe 而不是手动订阅所有内容并更新局部变量(无论如何这是一个不好的做法) :

下面是一些示例代码,说明它的外观:

let upcoming$ = this.tournamentsService.getUpcoming();
let favorite$ = this.favoriteService.getFavoriteTournaments();
let rankings$ = this.teamsService.getTeamRanking();

let allData$ = Observable.combineLatest(
    upcoming$, favorite$, rankings$,
    (tournament, favorite, team) => {
        return {tournament, favorite, team};
    }
);

// initial call -> this SHOULD be redundant,
// but since I don't know your code in detail
// i've put this in - if you can remove it or not
// depends on the order your data coming in
allData$
    .take(1)
    .do(({tournament, favorite, team}) => {
        this.displayMatches(...);
        this.sortByFavorites(...);
        this.fillWithRanking(...);
    })
    .subscribe();

// individual update triggers
upcoming$
    .switchMapTo(allData$.take(1))
    .do(({tournament, favorite, team}) => this.displayMatches(...))
    .subscribe();

favorite$
    .switchMapTo(allData$.take(1))
    .do(({tournament, favorite, team}) => this.sortByFavorites(...))
    .subscribe();

rankings$
    .switchMapTo(allData$.take(1))
    .do(({tournament, favorite, team}) => this.fillWithRanking(...))
    .subscribe();
Run Code Online (Sandbox Code Playgroud)


car*_*ant 5

您可以使用该scan运算符将发出的值与任何先前发出的值进行比较,并且可以包含指示组合可观察量的组件是否实际发生变化的附加数据。例如:

let combined = Observable
  .combineLatest(
    this.tournamentsService.getUpcoming(),
    this.favoriteService.getFavoriteTournaments(),
    this.teamsService.getTeamRanking()
  )
  .scan((acc, values) => [
    ...values,
    acc[0] !== values[0],
    acc[1] !== values[1],
    acc[2] !== values[2]
  ], []);

combined.subscribe(
  ([tournament, favorite, team, tournamentChanged, favoriteChanged, teamChanged]) => {
    console.log(`tournament = ${tournament}; changed = ${tournamentChanged}`);
    console.log(`favorite = ${favorite}; changed = ${favoriteChanged}`);
    console.log(`team = ${team}; changed = ${teamChanged}`);
  }
);
Run Code Online (Sandbox Code Playgroud)


Yar*_*rin 5

我得出了以下解决方案,有点类似于 Cartant 提出的解决方案,但在扫描时使用成对代替,这看起来更优雅。成对运算符将先前发出的值保留在缓冲区中,并将先前的值和新发出的值作为数组提供给下一个运算符,因此您可以轻松检查值是否已更改并进一步传递结果。在我的示例中,为了清楚起见,我将其简化为仅 2 个 Observables。

combineLatest([obs1$, obs2$]).pipe(
    pairwise(),
    map(([oldValues, newValues]) => oldValues.map((value, i) => value !== newValues[i])),
    ).subscribe(([obs1$HasChanged, obs2$HasChanged]) => {
)
Run Code Online (Sandbox Code Playgroud)