RxJs:zip运算符的有损形式

JeB*_*JeB 6 javascript reactive-programming rxjs backpressure rxjs5

考虑使用zip操作符将两个无限的Observable压缩在一起,其中一个Observable发出的项目频率是另一个的两倍.
当前的实现是无损耗的,即如果我让这些Observables发射一小时然后我在它们的发射速率之间切换,第一个Observable将最终赶上另一个.
随着缓冲区越来越大,这将导致内存爆炸.
如果第一个observable将发出几个小时的项目而第二个将在最后发出一个项目,则会发生同样的情况.

如何为此运营商实现有损行为?我只是想随时从两个流中获得排放,而我不关心我错过的更快流量的排放量.

澄清:

  • 我试图在这里解决的主要问题是由于zip操作员的无损性质导致的内存爆炸.
  • 即使两个流每次都发出相同的值,我想随时从两个流中获得发射

例:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70
Run Code Online (Sandbox Code Playgroud)

Regular zip会产生以下输出:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
Run Code Online (Sandbox Code Playgroud)

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)

我希望它产生的输出:

[1, 10]
[3, 20]
[5, 30]
Run Code Online (Sandbox Code Playgroud)

说明:
有损zip运算符zip具有缓冲区大小1.这意味着它只会保留首先发出的流中的第一个项目,并将丢弃所有其余项目(第一个项目和第二个项目的第一个项目之间到达的项目).因此,示例中发生的情况如下:stream1发出1,有损zip会"记住"它并忽略所有项目stream1直到stream2发出.第一次排放stream210如此stream1松散2.在相互发射(第一次有损发射zip)后,它开始:"记住" 3,"松散" 4,发射[3,20].然后开始了:"记得" 5,"松" 6,并7放出[5,30].然后开始了:"记得" 40,"松" 50,60,70和等待的下一个项目stream1.

例2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a
Run Code Online (Sandbox Code Playgroud)

zip在这种情况下,常规运算符将爆炸内存.
我不想要它.

总结:
基本上我希望有损zip运算符只记住stream 1 先前相互发射之后发出的第一个值,并在stream 2赶上时发出stream 1.并重复一遍.

JeB*_*JeB 7

以下内容将为您提供所需的行为:

Observable.zip(s1.take(1), s2.take(1)).repeat()
Run Code Online (Sandbox Code Playgroud)

RxJs 5.5管道语法中:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
Run Code Online (Sandbox Code Playgroud)

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)

说明:

  • repeat运算符(在其当前实现中)在后者完成时重新订阅可观察到的源,即在该特定情况下,它重新订阅zip每次相互发射.
  • zip结合两个可观察量并等待它们两个发射.combineLatest也会这样做,因为它并不重要take(1)
  • take(1) 实际上会处理内存爆炸并定义有损行为

如果你想在相互发射时从每个流中获取最后一个而不是第一个值,请使用:

Observable.combineLatest(s1, s2).take(1).repeat()
Run Code Online (Sandbox Code Playgroud)

RxJs 5.5管道语法中:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
Run Code Online (Sandbox Code Playgroud)

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Run Code Online (Sandbox Code Playgroud)