可暂停的RxJS流

Tar*_*alo 19 javascript rxjs angular

我有一个带有单个按钮的简单组件,该按钮可以启动和暂停由RxJS计时器生成的数字流。

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`,
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  active$ = new BehaviorSubject<boolean>(true);

  ngOnInit(): void {
    const on$ = this.active$.pipe(filter(v => v));
    const off$ = this.active$.pipe(filter(v => !v));

    const stream$ = timer(500, 500).pipe(share());

    const out$ = merge(
      stream$.pipe(
        bufferToggle(off$, () => on$),
        mergeAll(),
      ),
      stream$.pipe(
        windowToggle(on$, () => off$),
        mergeAll(),
      ),
    );

    out$.subscribe(v => console.log(v));
  }

  toggle(): void {
    this.active$.next(!this.active$.value);
  }
}
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

这可以完美工作,但我需要再添加一个功能!

我需要根据满足条件的流中的值自动暂停流。

例如,如果最新值为5的倍数,则暂停流。


您有任何想法如何做到这一点?

这是stackblitz上的一个可运行示例https://stackblitz.com/edit/angular-6hjznn

fri*_*doo 10

可以扩展当前的bufferToggle / windowToggle方法,也可以使用自定义的缓冲区实现。

1.扩展bufferToggle / windowToggle方法

您可以在之后的操作员队列中添加自定义缓冲区(数组)bufferToggle。当bufferToggle发出时,将这些值附加到您的自定义缓冲区。然后从自定义缓冲区中获取值,直到缓冲区中的某个元素与暂停条件匹配为止。发射这些值并暂停流。

可操作的运算符(Demo

pausable运营商将发出符合停止条件的值,然后停止流直接算账。可以根据您的特定需求进行调整,例如使用较少的输入参数进行简化,或者share可以将其合并到中pausable

export function pausable<T, O>(
  on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values 
  off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values
  haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused
  pause: () => void, // pauses the stream by triggering the given on$ and off$ observables
  spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array
) {
  return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
    let buffer: T[] = [];
    return merge(
      source.pipe(
        bufferToggle(off$, () => on$),
        tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
        map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
        tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
        map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
        mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
      ),
      source.pipe(
        windowToggle(on$, () => off$),
        mergeMap(x => x),
        tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
      ),
    );
  });
}
Run Code Online (Sandbox Code Playgroud)

用法

active$ = new BehaviorSubject<boolean>(true);
on$ = this.active$.pipe(filter(v => v));
off$ = this.active$.pipe(filter(v => !v));

interval(500).pipe(
  share(),
  pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false))
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0
Run Code Online (Sandbox Code Playgroud)

2.完全自定义的缓冲区

考虑到pausable上面的运算符使用了大量输入参数,因此我使用完全自定义的缓冲区实现了第二个运算符,并且只有一个可观察到的输入可以打开或关闭缓冲区,这类似于Brandon的方法

bufferIf运算符(Demo

bufferIf当给定的值condition发出true并从缓冲区发出所有值或当conditionis 为时传递新值时,它将缓冲输入值false

export function bufferIf<T>(condition: Observable<boolean>) {
  return (source: Observable<T>) => defer(() => {
    const buffer: T[] = [];
    let paused = false;
    let sourceTerminated = false;
    return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on
      source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)),
      condition.pipe(map(v => [v, 1]))
    ).pipe( // add values from the source to the buffer or set the paused variable
      tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean), 
      switchMap(_ => new Observable<T>(s => {
        setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments)
          while (buffer.length > 0 && !paused) s.next(buffer.shift())
        }, 0)
      })), // complete the stream when the source terminated and the buffer is empty
      takeWhile(_ => !sourceTerminated || buffer.length > 0, true) 
    );
  })
} 
Run Code Online (Sandbox Code Playgroud)

用法

active$ = new BehaviorSubject<boolean>(true);

interval(500).pipe(
  bufferIf(this.active$.pipe(map(v => !v))),
  tap(value => this.pauseOn(value) ? this.active$.next(false) : null)
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0
Run Code Online (Sandbox Code Playgroud)


Bra*_*don 7

这是一个自定义的暂停运算符,当暂停信号为时,它只会在缓冲区中累积值,而当信号true为时,将一个一个地发出它们false

将其与简单的tap运算符结合使用,可以在值达到特定条件时切换行为主题暂停信号,并且单击按钮时会有一些暂停,并且在条件满足时也会暂停(在这种情况下为12的倍数):

这是pause操作员:

function pause<T>(pauseSignal: Observable<boolean>) {
  return (source: Observable<T>) => Observable.create(observer => {
    const buffer = [];
    let paused = false;
    let error;
    let isComplete = false;

    function notify() {
      while (!paused && buffer.length) {
        const value = buffer.shift();
        observer.next(value);
      }

      if (!buffer.length && error) {
        observer.error(error);
      }

      if (!buffer.length && isComplete) {
        observer.complete();
      }
    }

    const subscription = pauseSignal.subscribe(
      p => {
        paused = !p;
        setTimeout(notify, 0);
      },
      e => {
        error = e;
        setTimeout(notify, 0);
      },
      () => {});

    subscription.add(source.subscribe(
      v => {
        buffer.push(v);
        notify();
      },
      e => {
        error = e;
        notify();
      },
      () => {
        isComplete = true;
        notify();
      }
    ));

    return subscription;
  });
}
Run Code Online (Sandbox Code Playgroud)

这是它的用法:

const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
this.active$ = new BehaviorSubject<boolean>(true);
const stream$ = timer(500, 500);
const out$ = stream$.pipe(
  pause(this.active$),
  tap(value => {
    if (CONDITION(value)) {
      this.active$.next(false);
    }
  }));

this.d = out$.subscribe(v => console.log(v));
Run Code Online (Sandbox Code Playgroud)

还有一个工作示例:https : //stackblitz.com/edit/angular-bvxnbf