RxJs将流分成多个流

Ger*_*ári 8 javascript system.reactive rxjs

如何根据分组方法将永不结束的流拆分为多个结束流?

--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>
Run Code Online (Sandbox Code Playgroud)

进入这些可观察者

--a--a-a-a-a-|
             b---b-b--b-|
                        c-c---c-c-|
                                  d-d-d-|
                                        e...>
Run Code Online (Sandbox Code Playgroud)

正如你所看到的那样,a它在开始时,在我收到之后b,我将不再a这样,它应该结束.这就是正常groupBy情况不好的原因.

mar*_*tin 6

您可以使用windowshare源Observable.还有一个小技巧bufferCount(2, 1):

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

source
    .bufferCount(2, 1) // delay emission by one item
    .map(arr => arr[0])
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)

这打印(因为toArray()):

[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
Run Code Online (Sandbox Code Playgroud)

此解决方案的问题是订阅的顺序source.我们需要window通知程序在第一个之前订阅bufferCount.否则,首先进一步按下一个项目,然后检查它是否与前一个项目不同.filter(([oldValue, newValue]) ...).

这意味着需要先将发射延迟一次window(这是第一次.bufferCount(2, 1).map(arr => arr[0]).

或者也许我自己更容易控制订阅的顺序publish():

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

const connectable = source.publish();

connectable
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

connectable.connect();
Run Code Online (Sandbox Code Playgroud)

输出是一样的.