Ger*_*ári 8 javascript system.reactive rxjs
如何根据分组方法将永不结束的流拆分为多个结束流?
--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>
Run Code Online (Sandbox Code Playgroud)
进入这些可观察者
--a--a-a-a-a-|
b---b-b--b-|
c-c---c-c-|
d-d-d-|
e...>
Run Code Online (Sandbox Code Playgroud)
正如你所看到的那样,a它在开始时,在我收到之后b,我将不再a这样,它应该结束.这就是正常groupBy情况不好的原因.
您可以使用window和share源Observable.还有一个小技巧bufferCount(2, 1):
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
source
.bufferCount(2, 1) // delay emission by one item
.map(arr => arr[0])
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)
这打印(因为toArray()):
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
Run Code Online (Sandbox Code Playgroud)
此解决方案的问题是订阅的顺序source.我们需要window通知程序在第一个之前订阅bufferCount.否则,首先进一步按下一个项目,然后检查它是否与前一个项目不同.filter(([oldValue, newValue]) ...).
这意味着需要先将发射延迟一次window(这是第一次.bufferCount(2, 1).map(arr => arr[0]).
或者也许我自己更容易控制订阅的顺序publish():
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
const connectable = source.publish();
connectable
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
connectable.connect();
Run Code Online (Sandbox Code Playgroud)
输出是一样的.
| 归档时间: |
|
| 查看次数: |
3508 次 |
| 最近记录: |