Node.js Streams vs. Observables

uri*_*ish 71 javascript reactive-programming node.js rxjs bacon.js

在了解了Observables之后,我发现它们与Node.js流非常相似.两者都有一种机制,可以在新数据到达时通知消费者,发生错误或没有更多数据(EOF).

我很想了解两者之间的概念/功能差异.谢谢!

m4k*_*tub 89

无论观测量和node.js中的让你解决同样的根本问题:异步处理值的序列.我相信,两者之间的主要区别在于它的外观背景.该上下文反映在术语和API中.

Observables方面,您有一个EcmaScript的扩展,它引入了反应式编程模型.它试图填补值生成和异步之间的间隙用的极简和可组合的概念ObserverObservable.

在node.js和Streams端,您希望为网络流和本地文件的异步和高性能处理创建一个接口.术语从初始上下文推导,你会得到pipe,chunk,encoding,flush,Duplex,Buffer,等由于具有务实的做法是提供您失去撰写的东西,因为它并不像一些统一的能力,特别是用例的明确支持.例如,您pushReadablewrite上使用,Writable但在概念上,您在做同样的事情:发布一个值.

因此,在实践中,如果你看的概念,如果你使用的选项{ objectMode: true },可以匹配ObservableReadable流和ObserverWritable流.您甚至可以在两个模型之间创建一些简单的适配器.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}
Run Code Online (Sandbox Code Playgroud)

您可能已经注意到我更改了一些名称并使用了更简单的概念,ObserverSubscription在此处介绍,以避免Observables在其中完成的重复性过载Generator.基本上,Subscription允许您取消订阅Observable.无论如何,使用上面的代码你可以有一个pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
Run Code Online (Sandbox Code Playgroud)

与之相比process.stdin.pipe(process.stdout),您所拥有的是一种组合,过滤和转换流的方法,这些流也适用于任何其他数据序列.您可以使用Readable,TransformWritable流来实现它,但API支持子类化而不是链接Readable和应用函数.在Observable模型上,例如,转换值对应于将变换器函数应用于流.它不需要新的子类型Transform.

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))
Run Code Online (Sandbox Code Playgroud)

结论?在Observable任何地方都很容易引入反应模型和概念.围绕这个概念实现整个库更加困难.所有这些小功能需要始终如一地协同工作.毕竟,ReactiveX项目仍在继续.但是,如果你真的需要将文件内容发送到客户端,处理编码,然后在NodeJS中将其压缩到那里,并且它运行良好.

  • 我真的不确定这整个"延伸到Ecmascript的事情".RxJS只是一个库,与RxJava等相同.最后,在ES7或ES8中,ES/JS中可能存在一些与Observable相关的关键字,但它们肯定不是语言的一部分,当你回答这个问题时肯定不是在2015年. (4认同)