Mar*_* An 4 rxjs reactive-streams
我有一个这种形式的值流:
[1,2,3,1,2,1,1,1,2...]
Run Code Online (Sandbox Code Playgroud)
我想将其转换为以下形式的组流:
[[1,2,3],[1,2],[1],[1],[1,2]...]
Run Code Online (Sandbox Code Playgroud)
每次值变为 1 时都应创建新组。
您可以使用bufferWhen()运算符来收集发出的值,直到 observable 发出一个值。在这个例子中,我使用一个主题将流的浅拷贝发送到缓冲区。
每当发出数字时,缓冲区就会1发出。如果流以 开头,1则发出一个空数组。所以我过滤掉了。
const {from, Subject} = rxjs;
const {filter, bufferWhen, tap, skip} = rxjs.operators;
const stream = from([1,2,3,1,2,1,1,1,2]);
const trigger = new Subject();
stream.pipe(
tap(v => trigger.next(v)),
bufferWhen(() => trigger.pipe(filter(v => v === 1))),
filter(v => v.length)
).subscribe(x => console.log(x));Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/@reactivex/rxjs@6.x/dist/global/rxjs.umd.js"></script>Run Code Online (Sandbox Code Playgroud)
如果您想在值比前一个值减少时发出触发器,则可以使用scan()。这在逻辑上可能更好一点,但作为触发器使用适合这个问题。1
| 归档时间: |
|
| 查看次数: |
373 次 |
| 最近记录: |