每当另一个Observable发射时,跳过源中的下一个发射的运算符

Bre*_*rto 8 reactive-programming rxjs rxjs5

我有一个用例,每当另一个通知者Observable发出时,我都需要Observable跳过其下一个发出。

source:    |---X---X---X---X---X---X---X---X---X---X--|>
notifier:  |-------------X---------X----------X-------|>
result:    |---X---X---X-------X---X-------X-------X--|>
Run Code Online (Sandbox Code Playgroud)

基本上,我希望有一个叫做的运算符skipNextWhen,它以可观察的方式接收通知程序,并跳过源中的下一个发射。

我尝试使用使用pausable运算符的实现(使用来重新实现switchMap),但无法使其正常工作。

停顿的

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        pausable: typeof pausable;
    }
}

function pausable<T>(notifier: Observable<boolean>): Observable<T> {
    return notifier.startWith(false).switchMap((paused) => {
        if (paused) {
            return Observable.never();
        } else {
            const source = new Subject();
            this.subscribe(source);
            return source;
        }
    });
}

Observable.prototype.pausable = pausable;
Run Code Online (Sandbox Code Playgroud)

skipNextWhen.ts

import { Observable } from 'rxjs/Observable';
import './pausable';

declare module 'rxjs/Observable' {
    interface Observable<T> {
        skipNextWhen: typeof skipNextWhen;
    }
}

function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
    const notifier = Observable.merge(this.map(() => false), 
                                      other.map(() => true));
    return this.pausable(notifier);
}

Observable.prototype.skipNextWhen = skipNextWhen;
Run Code Online (Sandbox Code Playgroud)

我应该考虑使用更适合的运算符吗?我在当前实现中看到的行为是,结果Observable发出一次,然后再也不会发出-即使通知者Observable从未发出。

mar*_*tin 6

我可以想到两种解决方案:

  1. 使用.filter().do()以及一些副作用。

    即使不是那种“Rx”方式,这也可能更容易理解解决方案:

    function skipNextWhen(other) {
        let skipNext = false;
    
        return this.merge(other.do(() => skipNext = true).filter(() => false))
            .filter(val => {
                const doSkip = skipNext;
                skipNext = false;
                return !doSkip;
            });
    }
    
    Run Code Online (Sandbox Code Playgroud)

    merge()只是用来更新skipNextother的值总是被忽略。

  2. 使用.scan()

    该解决方案没有任何状态变量和副作用。

    function skipNextWhen(other) {
        const SKIP = 'skip';
    
        return this.merge(other.mapTo(SKIP))
            .scan((acc, val) => {
                if (acc === SKIP) {
                    return null;
                } else if (val === SKIP) {
                    return SKIP;
                } else {
                    return val;
                }
            }, [])
            .filter(val => Boolean(val) && val !== SKIP);
    }
    
    Run Code Online (Sandbox Code Playgroud)

    基本上,当SKIP到达时我会立即返回它,因为它会被操作员再次在acc参数中传递scan(),然后被filter().

    如果我收到一个正常值但先前的值是SKIP我忽略它并返回null稍后过滤掉的值。

两种解决方案都给出了相同的结果:

Observable.prototype.skipNextWhen = skipNextWhen;

const source = Observable.range(1, 10)
    .concatMap(val => Observable.of(val).delay(100));

source
    .skipNextWhen(Observable.interval(350))
    .subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)

这将打印以下内容:

1
2
3
5
6
8
9
10
Run Code Online (Sandbox Code Playgroud)

请注意,您实际上并不是在创建新的运算符。您只有操作员链的快捷方式。例如,这不会让您other在源完成时取消订阅。


Eri*_*ton 5

我已经启动了一个(非常)小的库,其中包含我想要的一些 rxjs 实用程序。它恰好有一个功能可以完全按照您的要求执行:skipAfter。从文档:

source: -1-----2-----3-----4-----5-|
skip$:  ----0----------0-0----------

result: -1-----------3-----------5-|
Run Code Online (Sandbox Code Playgroud)

图书馆在这里:https : //github.com/simontonsoftware/s-rxjs-utils