使用combineLatest 将 n+1 个流添加到 RXJS 流

Chr*_*ian 7 rxjs

希望有人能帮助我解决这个问题。我有 2 个流,需要在其上使用操作符 mergeLatest。一段时间后,我需要动态添加还需要使用combineLatest 的流。这是我需要做的:

stream a ---------a---------------b-------------c------------------------>
stream b ----1--------2-----3-------------------------4------------------>
stream c (not defined at start)      -----z-----------------x------------>
stream d (not defined at start)                         ----------k------>
                                 (combineLatest)
result   ---------(a1)(a2)--(a3)--(b3)----(b3z)-(c3z)-(c4z)-(c4x)-(c4xk)->   
Run Code Online (Sandbox Code Playgroud)

更新

更具体地说,我想打开此STREAM(链接) 在此输入图像描述

对于这个结果:

A----B---B0-C0--D0--D1--E1--E1a--F1a--F2a---G2a---G3a--H3a-H3b--I3b
Run Code Online (Sandbox Code Playgroud)

Ric*_*sen 3

采用 Oles 的答案,稍微简化并添加问题更新中给出的测试数据

const Subject = Rx.ReplaySubject
const ReplaySubject = Rx.ReplaySubject

const newStream =  new Subject()

// Set up output, no streams yet
const streamOfStreams = newStream
  .scan( (acc, stream) => {
    acc.push(stream);
    return acc; 
  }, [])
  .switchMap(vs => Observable.combineLatest(vs))
  .map(arrayOfValues => arrayOfValues.join(''))    // declutter
  .subscribe(console.log)

// Add a stream
const s1 = new ReplaySubject() 
newStream.next(s1)
// emit on streams
s1.next('A'); s1.next('B')

// Add a stream
const s2 = new ReplaySubject() 
newStream.next(s2)
// emit on streams
s2.next('0'); s1.next('C')
s1.next('D'); s2.next('1'); s1.next('E'); 

// Add a stream
const s3 = new ReplaySubject() 
newStream.next(s3)
// emit on streams
s3.next('a'); 
s1.next('F'); s2.next('2'); s1.next('G'); s2.next('3'); s1.next('H'); 
s3.next('b'); s1.next('I')
Run Code Online (Sandbox Code Playgroud)

工作示例:CodePen


更新

Christian 好心地提供了一些比我上面使用的排序主题更“真实世界”的测试流。不幸的是,这些突出了现有解决方案中的一个错误。

作为参考,新的测试流是

const streamA = Rx.Observable.timer(0,800).map(x => String.fromCharCode(x+ 65));
const streamB = Rx.Observable.timer(0,1300).map(x => x);
const streamC = Rx.Observable.timer(1100, 2000).map(x => String.fromCharCode(x+ 97));

setTimeout(() => newStream.next(streamA), 500);
setTimeout(() => newStream.next(streamB), 2000);
setTimeout(() => newStream.next(streamC), 3000);
Run Code Online (Sandbox Code Playgroud)

问题#1

第一个问题源于 中的核心线streamOfStreams

  .switchMap(vs => Observable.combineLatest(vs))
Run Code Online (Sandbox Code Playgroud)

这实质上是说,每次出现新的流数组时,将其映射到combineLatest()新数组并切换到新的可观察值。然而,测试可观察量是冷的,这意味着每次重新订阅都会获得完整的流。

参考:Rx 简介 - 热和冷可观察量

一些可观察的序列可能看起来很热,但实际上很冷。Observable.Interval 和 Observable.Timer 是令很多人惊讶的几个例子

所以我们得到
- 预期A--B--B0...
- 实际A--B--A0--B0...

显而易见的解决方案是将冷的可观测值变成热的,

const asHot = (stream) => {
  const hot = stream.multicast(() => new Rx.Subject())
  hot.connect()
  return hot
}
Run Code Online (Sandbox Code Playgroud)

但这从序列中省略了 B0 ,A--B--C0...所以我们需要 hot + 1 previous ,这可以通过缓冲区大小为 1 来获得

const asBuffered = (stream) => {
  const bufferOne = new ReplaySubject(1)
  stream.subscribe(value => bufferOne.next(value))
  return bufferOne
}
Run Code Online (Sandbox Code Playgroud)

问题#2

第二个问题来自于这样的事实:streamC 延迟了它的第一次发射 1100ms(很好的测试 Christian!)。

此结果是
- 预期A--B--B0--C0--D0--D1--E1--E1a...
- 实际A--B--B0--C0--D0--E1a...

这意味着我们需要延迟添加流,直到它第一次发出

const addStreamOnFirstEmit = (stream) => {
  const buffered = asBuffered(stream)
  buffered.first().subscribe( _ => {
    newStream.next(buffered)
  })
}
Run Code Online (Sandbox Code Playgroud)

工作示例:CodePen

CodePen 上的注释

我保留了各种streamAdder函数进行实验,还有一些_debug版本可以发出流和addStream 事件来显示序列。

还限制了源流,以便控制台不会滚动太多。

关于预期输出的注释

新的解决方案与“G3a”之后的问题中给出的预期输出不同

  • 预期的A----B---B0-C0--D0--D1--E1--F1---F2---F2a---G2a---G3a--H3a--H3b--I3b
  • 实际的A----B---B0-C0--D0--D1--E1--E1a--F1a--F2a---G2a---G3a--G3b--H3b--I3b

这是由于“H”和“b”同时发射所致。问题#3?

再进行一项测试

为了查看如果streamC将第一次发射延迟到streamA和streamB的两次发射之后解决方案是否失败,我将延迟更改为1800ms

const streamC = Rx.Observable.timer(1800, 2000).map(x => String.fromCharCode(x+ 97));
Run Code Online (Sandbox Code Playgroud)

我相信这个测试的输出是正确的。