RxJS mixLatest 每个路径仅发出一次

Ada*_* B. 5 javascript rxjs rxjs-marbles

是否有任何方法/模式可以使用combineLatest()或其他一些运算符,以便如果组合的可观察量相互依赖,则它们对于 DAG 中具有相同原点的每组路径仅发出一次?我想用图可能更容易解释。

图表:

   A C
  /| |
 B | |
  \|/
   D
Run Code Online (Sandbox Code Playgroud)

这里 B 订阅了 A,D 订阅了 A、B 和 C。默认行为是,如果 A 发出,D 发出两次:当 A 发出时一次,当 B 发出时再一次(由于 A 发出) )。我希望它在两者都发射后只发射一次。然而,如果 C 发出,那么 D 也应该立即发出。

这是代码:

const A = new Rx.BehaviorSubject(1);
const B = A.pipe(Rx.map((x) => x + 1));
const C = new Rx.BehaviorSubject(3);

const D = Rx.combineLatest({ A, B, C });

D.subscribe(console.log); // {A: 1, B: 2, C: 3}
A.next(2); // Two emissions: {A: 2, B: 2, C: 3}, {A: 2, B: 3, C: 3} 
// Would like the second one ONLY, i.e. {A: 2, B: 3, C: 3}
C.next(4); // Correctly emits: {A: 2, B: 3, C: 4}
Run Code Online (Sandbox Code Playgroud)

我尝试过的一种解决方案是将其线性化并使 A、B 和 C 的整个集合成为可观察的:

 {A, C}
    |
{A, C, B}
    |
   {D}
Run Code Online (Sandbox Code Playgroud)

这可行,但我想知道是否有更好的方法。

Pic*_*cci 2

如果我正确理解了这个问题,那么您希望最终的 ObservableD在任何时候A( 的上游B)发出或C发出,但是,如果A发出,您还需要 发出的值B

如果是这种情况,我会确保任何时候B发出,它不仅发出 的值B,还发出其上游通知的值A,然后我将传递给combineLatestonlyBC,像这样

const A = new BehaviorSubject(1);
const B = A.pipe(map((x) => [x, x + 1]));
const C = new BehaviorSubject(3);

const D = combineLatest({ B, C }).pipe(
  map((val) => ({ A: val.B[0], B: val.B[1], C: val.C }))
);
Run Code Online (Sandbox Code Playgroud)

  • 下游,即在最终订阅中,每当 A 发出时,无论您有多少“派生”Observables/流,您都希望收到一个通知。如果是这种情况,您不应该创建 Observable“B”,而应该将一个“map”运算符附加到 Observable“A”,将 Observable“A”通知的任何值转换为您感兴趣的元组。这意味着创建一系列转换,从 A 发出的值开始,然后一路组合。我认为这个链的一般实现可能非常复杂。也许在更受限的情况下是可行的。 (2认同)