从生产者方面取消Observable,而不是消费者方面

Ale*_*lls 5 reactive-programming observable rxjs

从消费者方面取消,可能会使用takeUntil调用,但这不一定非常动态.但是,在这种情况下,我希望从等式的生产者方面取消Observable,就像您希望在promise链中取消Promise一样(本机实用程序不太可能).

假设我从一个方法返回了这个Observable.(这个Queue库是一个读/写文本文件的简单持久队列,我们​​需要锁定读/写,这样就不会有任何损坏).

Queue.prototype.readUnique = function () {

    var ret = null;
    var lockAcquired = false;

    return this.obsEnqueue
        .flatMap(() => acquireLock(this))
        .flatMap(() => {
            lockAcquired = true;
            return removeOneLine(this)
        })
        .flatMap(val => {
            ret = val;   // this is not very good
            return releaseLock(this);
        })
        .map(() => {
            return JSON.parse(ret);
        })
        .catch(e => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return genericObservable();
            }
        });

};
Run Code Online (Sandbox Code Playgroud)

我有两个不同的问题 -

  1. 如果我无法获取锁定,我如何"取消"observable,只返回一个没有结果的空Observable?我是否真的必须在每个返回调用中执行if/else逻辑来确定当前链是否被取消,如果是,则返回一个空的Observable?通过为空,我的意思是一个Observable,简单地触发onNext/onComplete,没有任何错误的可能性,也没有onNext的任何值.从技术上讲,我不认为这是一个空的Observable,所以我正在寻找真正被称为它的东西,如果它存在的话.

  2. 如果你看一下这个特定的代码序列:

    .flatMap(() => acquireLock(this))
    .flatMap(() => {
        lockAcquired = true;
        return removeOneLine(this)
    })
    .flatMap(val => {
        ret = val;
        return releaseLock(this);
    })
    .map(() => {
        return JSON.parse(ret);
    })
    
    Run Code Online (Sandbox Code Playgroud)

我正在做的是在方法的顶部存储对ret的引用,然后在稍后再次引用它.我正在寻找的是一种将从removeOneLine()触发的值传递给JSON.parse()的方法,而不必在链外设置一些状态(这简直是不优雅的).

ols*_*lsn 3

1)这取决于你的方法如何acquireLock工作 - 但我假设如果无法获取锁,它会抛出错误,在这种情况下,你可以使用 a创建catch并将后备流设置为空流:

return Rx.Observable.catch(
        removeLine$,
        Rx.Observable.empty()
    );
Run Code Online (Sandbox Code Playgroud)

2)为了节省有状态的外部变量,您可以简单地链接一个mapTo

let removeLine$ = acquireLock(this)
    .flatMap(() => this.obsEnqueue
        .flatMap(() => removeOneLine(this))
        .flatMap(val => releaseLock(this).mapTo(val))
        .map(val => JSON.parse(val))
        .catch(() => releaseLock(this))
    );
Run Code Online (Sandbox Code Playgroud)