JeB*_*JeB 6 javascript reactive-programming rxjs backpressure rxjs5
考虑使用zip操作符将两个无限的Observable压缩在一起,其中一个Observable发出的项目频率是另一个的两倍.
当前的实现是无损耗的,即如果我让这些Observables发射一小时然后我在它们的发射速率之间切换,第一个Observable将最终赶上另一个.
随着缓冲区越来越大,这将导致内存爆炸.
如果第一个observable将发出几个小时的项目而第二个将在最后发出一个项目,则会发生同样的情况.
如何为此运营商实现有损行为?我只是想随时从两个流中获得排放,而我不关心我错过的更快流量的排放量.
澄清:
zip
操作员的无损性质导致的内存爆炸.例:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
Run Code Online (Sandbox Code Playgroud)
Regular zip
会产生以下输出:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
Run Code Online (Sandbox Code Playgroud)
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)
我希望它产生的输出:
[1, 10]
[3, 20]
[5, 30]
Run Code Online (Sandbox Code Playgroud)
说明:
有损zip
运算符zip
具有缓冲区大小1
.这意味着它只会保留首先发出的流中的第一个项目,并将丢弃所有其余项目(第一个项目和第二个项目的第一个项目之间到达的项目).因此,示例中发生的情况如下:stream1
发出1
,有损zip会"记住"它并忽略所有项目stream1
直到stream2
发出.第一次排放stream2
是10
如此stream1
松散2
.在相互发射(第一次有损发射zip
)后,它开始:"记住" 3
,"松散" 4
,发射[3,20]
.然后开始了:"记得" 5
,"松" 6
,并7
放出[5,30]
.然后开始了:"记得" 40
,"松" 50
,60
,70
和等待的下一个项目stream1
.
例2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
Run Code Online (Sandbox Code Playgroud)
zip
在这种情况下,常规运算符将爆炸内存.
我不想要它.
总结:
基本上我希望有损zip
运算符只记住stream 1
先前相互发射之后发出的第一个值,并在stream 2
赶上时发出stream 1
.并重复一遍.
以下内容将为您提供所需的行为:
Observable.zip(s1.take(1), s2.take(1)).repeat()
Run Code Online (Sandbox Code Playgroud)
在RxJs 5.5
管道语法中:
zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
Run Code Online (Sandbox Code Playgroud)
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)
说明:
repeat
运算符(在其当前实现中)在后者完成时重新订阅可观察到的源,即在该特定情况下,它重新订阅zip
每次相互发射.zip
结合两个可观察量并等待它们两个发射.combineLatest
也会这样做,因为它并不重要take(1)
take(1)
实际上会处理内存爆炸并定义有损行为如果你想在相互发射时从每个流中获取最后一个而不是第一个值,请使用:
Observable.combineLatest(s1, s2).take(1).repeat()
Run Code Online (Sandbox Code Playgroud)
在RxJs 5.5
管道语法中:
combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
Run Code Online (Sandbox Code Playgroud)
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.combineLatest(s1,s2).take(1).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1677 次 |
最近记录: |