希望有人能帮助我解决这个问题。我有 2 个流,需要在其上使用操作符 mergeLatest。一段时间后,我需要动态添加还需要使用combineLatest 的流。这是我需要做的:
stream a ---------a---------------b-------------c------------------------>
stream b ----1--------2-----3-------------------------4------------------>
stream c (not defined at start) -----z-----------------x------------>
stream d (not defined at start) ----------k------>
(combineLatest)
result ---------(a1)(a2)--(a3)--(b3)----(b3z)-(c3z)-(c4z)-(c4x)-(c4xk)->
Run Code Online (Sandbox Code Playgroud)
更新
更具体地说,我想打开此STREAM(链接)

对于这个结果:
A----B---B0-C0--D0--D1--E1--E1a--F1a--F2a---G2a---G3a--H3a-H3b--I3b
Run Code Online (Sandbox Code Playgroud)
采用 Oles 的答案,稍微简化并添加问题更新中给出的测试数据
const Subject = Rx.ReplaySubject
const ReplaySubject = Rx.ReplaySubject
const newStream = new Subject()
// Set up output, no streams yet
const streamOfStreams = newStream
.scan( (acc, stream) => {
acc.push(stream);
return acc;
}, [])
.switchMap(vs => Observable.combineLatest(vs))
.map(arrayOfValues => arrayOfValues.join('')) // declutter
.subscribe(console.log)
// Add a stream
const s1 = new ReplaySubject()
newStream.next(s1)
// emit on streams
s1.next('A'); s1.next('B')
// Add a stream
const s2 = new ReplaySubject()
newStream.next(s2)
// emit on streams
s2.next('0'); s1.next('C')
s1.next('D'); s2.next('1'); s1.next('E');
// Add a stream
const s3 = new ReplaySubject()
newStream.next(s3)
// emit on streams
s3.next('a');
s1.next('F'); s2.next('2'); s1.next('G'); s2.next('3'); s1.next('H');
s3.next('b'); s1.next('I')
Run Code Online (Sandbox Code Playgroud)
工作示例:CodePen
Christian 好心地提供了一些比我上面使用的排序主题更“真实世界”的测试流。不幸的是,这些突出了现有解决方案中的一个错误。
作为参考,新的测试流是
const streamA = Rx.Observable.timer(0,800).map(x => String.fromCharCode(x+ 65));
const streamB = Rx.Observable.timer(0,1300).map(x => x);
const streamC = Rx.Observable.timer(1100, 2000).map(x => String.fromCharCode(x+ 97));
setTimeout(() => newStream.next(streamA), 500);
setTimeout(() => newStream.next(streamB), 2000);
setTimeout(() => newStream.next(streamC), 3000);
Run Code Online (Sandbox Code Playgroud)
问题#1
第一个问题源于 中的核心线streamOfStreams,
.switchMap(vs => Observable.combineLatest(vs))
Run Code Online (Sandbox Code Playgroud)
这实质上是说,每次出现新的流数组时,将其映射到combineLatest()新数组并切换到新的可观察值。然而,测试可观察量是冷的,这意味着每次重新订阅都会获得完整的流。
一些可观察的序列可能看起来很热,但实际上很冷。Observable.Interval 和 Observable.Timer 是令很多人惊讶的几个例子
所以我们得到
- 预期A--B--B0...
- 实际A--B--A0--B0...
显而易见的解决方案是将冷的可观测值变成热的,
const asHot = (stream) => {
const hot = stream.multicast(() => new Rx.Subject())
hot.connect()
return hot
}
Run Code Online (Sandbox Code Playgroud)
但这从序列中省略了 B0 ,A--B--C0...所以我们需要 hot + 1 previous ,这可以通过缓冲区大小为 1 来获得
const asBuffered = (stream) => {
const bufferOne = new ReplaySubject(1)
stream.subscribe(value => bufferOne.next(value))
return bufferOne
}
Run Code Online (Sandbox Code Playgroud)
问题#2
第二个问题来自于这样的事实:streamC 延迟了它的第一次发射 1100ms(很好的测试 Christian!)。
此结果是
- 预期A--B--B0--C0--D0--D1--E1--E1a...
- 实际A--B--B0--C0--D0--E1a...
这意味着我们需要延迟添加流,直到它第一次发出
const addStreamOnFirstEmit = (stream) => {
const buffered = asBuffered(stream)
buffered.first().subscribe( _ => {
newStream.next(buffered)
})
}
Run Code Online (Sandbox Code Playgroud)
工作示例:CodePen
我保留了各种streamAdder函数进行实验,还有一些_debug版本可以发出流和addStream 事件来显示序列。
还限制了源流,以便控制台不会滚动太多。
新的解决方案与“G3a”之后的问题中给出的预期输出不同
A----B---B0-C0--D0--D1--E1--F1---F2---F2a---G2a---G3a--H3a--H3b--I3b A----B---B0-C0--D0--D1--E1--E1a--F1a--F2a---G2a---G3a--G3b--H3b--I3b 这是由于“H”和“b”同时发射所致。问题#3?
为了查看如果streamC将第一次发射延迟到streamA和streamB的两次发射之后解决方案是否失败,我将延迟更改为1800ms
const streamC = Rx.Observable.timer(1800, 2000).map(x => String.fromCharCode(x+ 97));
Run Code Online (Sandbox Code Playgroud)
我相信这个测试的输出是正确的。