从流onError恢复的惯用方法

zer*_*kms 10 javascript reactive-programming reactive-extensions-js rxjs

免责声明:它是以前针对2个依赖流问题的安全更新的延续

处理允许流不终止的RxJS(或任何其他RX实现)中的错误的惯用方法是什么?

相关代码是

function convert(unit, value) {
    var request = {};
    request[unit] = value;

    var conversion = $.ajax({
        method: 'POST',
        url: './convert.php',
        data: request,
        dataType: 'json'
    }).promise();

    return Rx.Observable.fromPromise(conversion).takeUntil(inInput.merge(cmInput));
}

var cmElement = document.getElementById('cm'),
    inElement = document.getElementById('in');

var cmInput = Rx.Observable.fromEvent(cmElement, 'input').map(targetValue),
    inInput = Rx.Observable.fromEvent(inElement, 'input').map(targetValue);

var inches = cmInput
    .flatMap(convert.bind(null, 'cm'))
    .startWith(0);

var centimeters = inInput
    .flatMap(convert.bind(null, 'in'))
    .startWith(0);
Run Code Online (Sandbox Code Playgroud)

因此,您可以看到我们使用输入字段更改流convert并将其传递给将其转换为另一个单元的函数,并进一步传递结果.

如果在$.ajax()调用期间发生错误,则它会向上传播,整个inchescetimeters流停止(实际上是预期的).

但是如何实现它呢?

这样我就可以优雅地处理错误,比如显示错误消息,并在新数据到达时再试一次?

我目前的想法是引入像Haskell这样的复合类型Data.Either并将其流式传输而不是标量双精度型.

思考?

UPD:是的,我已经在不停止序列的情况下阅读了处理Reactive Extensions中的异常,但我仍然希望有更好的方法.

Bra*_*don 6

你真的有两个选择:

  1. 如你所说,返回某种形式Either可能是结果或错误.

由于这是JavaScript,您显然不需要正式类型,只能将错误实例与数字一起传输,并且您的订阅者可以通过检查接收到的值的运行时类型来告知它们何时接收它们.因此,这就像.catch(function (e) { return Rx.Observable.of(e); }.fromPromise调用之后添加一样简单(或者.promise()使用.then()错误过滤器来生成一个承诺,当发生错误时,它将具有您想要的任何值).

  1. 在单独的流上发送错误.

基本上有convert另一个参数,它应该用来发出错误的观察者:

function convert(errorObserver, unit, value) {
    ...
    return Rx.Observable
        .fromPromise(conversion)
        .catch(function (e) {
            errorObserver.onNext(e); // or whatever you want to emit here
            return Rx.Observable.empty(); // or possibly Rx.Observable.of(0) to reset?
        })
        ...
}
Run Code Online (Sandbox Code Playgroud)

然后只需Subject为您的错误流创建一个并将其作为第一个参数提供convert.如果您希望将cm错误与错误分开,则创建2个主题in.

我个人倾向于使用第一种方法.