Tar*_*alo 19 javascript rxjs angular
我有一个带有单个按钮的简单组件,该按钮可以启动和暂停由RxJS计时器生成的数字流。
import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';
@Component({
selector: 'my-app',
template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`,
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
active$ = new BehaviorSubject<boolean>(true);
ngOnInit(): void {
const on$ = this.active$.pipe(filter(v => v));
const off$ = this.active$.pipe(filter(v => !v));
const stream$ = timer(500, 500).pipe(share());
const out$ = merge(
stream$.pipe(
bufferToggle(off$, () => on$),
mergeAll(),
),
stream$.pipe(
windowToggle(on$, () => off$),
mergeAll(),
),
);
out$.subscribe(v => console.log(v));
}
toggle(): void {
this.active$.next(!this.active$.value);
}
}
Run Code Online (Sandbox Code Playgroud)
这可以完美工作,但我需要再添加一个功能!
我需要根据满足条件的流中的值自动暂停流。
例如,如果最新值为5的倍数,则暂停流。
您有任何想法如何做到这一点?
这是stackblitz上的一个可运行示例https://stackblitz.com/edit/angular-6hjznn
fri*_*doo 10
可以扩展当前的bufferToggle / windowToggle方法,也可以使用自定义的缓冲区实现。
您可以在之后的操作员队列中添加自定义缓冲区(数组)bufferToggle。当bufferToggle发出时,将这些值附加到您的自定义缓冲区。然后从自定义缓冲区中获取值,直到缓冲区中的某个元素与暂停条件匹配为止。发射这些值并暂停流。
该pausable运营商将发出符合停止条件的值,然后停止流直接算账。可以根据您的特定需求进行调整,例如使用较少的输入参数进行简化,或者share可以将其合并到中pausable。
export function pausable<T, O>(
on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values
off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values
haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused
pause: () => void, // pauses the stream by triggering the given on$ and off$ observables
spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array
) {
return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
let buffer: T[] = [];
return merge(
source.pipe(
bufferToggle(off$, () => on$),
tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
),
source.pipe(
windowToggle(on$, () => off$),
mergeMap(x => x),
tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
),
);
});
}
Run Code Online (Sandbox Code Playgroud)
用法
active$ = new BehaviorSubject<boolean>(true);
on$ = this.active$.pipe(filter(v => v));
off$ = this.active$.pipe(filter(v => !v));
interval(500).pipe(
share(),
pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false))
).subscribe(console.log);
pauseOn = (value: number) => value > 0 && value % 10 === 0
Run Code Online (Sandbox Code Playgroud)
考虑到pausable上面的运算符使用了大量输入参数,因此我使用完全自定义的缓冲区实现了第二个运算符,并且只有一个可观察到的输入可以打开或关闭缓冲区,这类似于Brandon的方法。
bufferIf当给定的值condition发出true并从缓冲区发出所有值或当conditionis 为时传递新值时,它将缓冲输入值false。
export function bufferIf<T>(condition: Observable<boolean>) {
return (source: Observable<T>) => defer(() => {
const buffer: T[] = [];
let paused = false;
let sourceTerminated = false;
return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on
source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)),
condition.pipe(map(v => [v, 1]))
).pipe( // add values from the source to the buffer or set the paused variable
tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean),
switchMap(_ => new Observable<T>(s => {
setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments)
while (buffer.length > 0 && !paused) s.next(buffer.shift())
}, 0)
})), // complete the stream when the source terminated and the buffer is empty
takeWhile(_ => !sourceTerminated || buffer.length > 0, true)
);
})
}
Run Code Online (Sandbox Code Playgroud)
用法
active$ = new BehaviorSubject<boolean>(true);
interval(500).pipe(
bufferIf(this.active$.pipe(map(v => !v))),
tap(value => this.pauseOn(value) ? this.active$.next(false) : null)
).subscribe(console.log);
pauseOn = (value: number) => value > 0 && value % 10 === 0
Run Code Online (Sandbox Code Playgroud)
这是一个自定义的暂停运算符,当暂停信号为时,它只会在缓冲区中累积值,而当信号true为时,将一个一个地发出它们false。
将其与简单的tap运算符结合使用,可以在值达到特定条件时切换行为主题暂停信号,并且单击按钮时会有一些暂停,并且在条件满足时也会暂停(在这种情况下为12的倍数):
这是pause操作员:
function pause<T>(pauseSignal: Observable<boolean>) {
return (source: Observable<T>) => Observable.create(observer => {
const buffer = [];
let paused = false;
let error;
let isComplete = false;
function notify() {
while (!paused && buffer.length) {
const value = buffer.shift();
observer.next(value);
}
if (!buffer.length && error) {
observer.error(error);
}
if (!buffer.length && isComplete) {
observer.complete();
}
}
const subscription = pauseSignal.subscribe(
p => {
paused = !p;
setTimeout(notify, 0);
},
e => {
error = e;
setTimeout(notify, 0);
},
() => {});
subscription.add(source.subscribe(
v => {
buffer.push(v);
notify();
},
e => {
error = e;
notify();
},
() => {
isComplete = true;
notify();
}
));
return subscription;
});
}
Run Code Online (Sandbox Code Playgroud)
这是它的用法:
const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
this.active$ = new BehaviorSubject<boolean>(true);
const stream$ = timer(500, 500);
const out$ = stream$.pipe(
pause(this.active$),
tap(value => {
if (CONDITION(value)) {
this.active$.next(false);
}
}));
this.d = out$.subscribe(v => console.log(v));
Run Code Online (Sandbox Code Playgroud)
还有一个工作示例:https : //stackblitz.com/edit/angular-bvxnbf
| 归档时间: |
|
| 查看次数: |
575 次 |
| 最近记录: |