反应式编程 - Node.js中的RxJS与EventEmitter

Kis*_*nti 49 javascript asynchronous reactive-programming node.js rxjs

最近我开始研究RxJS和RxJava(来自Netflix)的图书馆,这些图书馆致力于反应式编程的概念.

Node.js基于事件循环工作,它为您提供了异步编程的所有工具,后续的节点库(如"cluster")可帮助您充分利用多核机器.Node.js还为您提供了EventEmitter功能,您可以在其中订阅事件并以异步方式对其进行操作.

另一方面,如果我理解正确RxJS(和一般的反应式编程)工作原理事件流,订阅事件流,异步转换事件流数据.

所以,问题是在Node.js中使用Rx包是什么意思.Node的事件循环,事件发射器和订阅Rx的流和订阅有多么不同.

And*_*ltz 88

我的回答〜两年前是错的,我无法对其进行大幅编辑或删除.Observable与EventEmitters不同.在某些情况下,它们可能像EventEmitters一样,即当它们使用RxJS主题进行多播时,但通常它们不像EventEmitters那样.

简而言之,RxJS Subject就像一个EventEmitter,但RxJS Observable是一个更通用的接口.Observable更类似于零参数的函数.

考虑以下:


function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);
Run Code Online (Sandbox Code Playgroud)

当然,我们都希望看到输出:

"Hello"
42
"Hello"
42
Run Code Online (Sandbox Code Playgroud)

您可以在上面编写相同的行为,但使用Observables:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});
Run Code Online (Sandbox Code Playgroud)

输出是一样的:

"Hello"
42
"Hello"
42
Run Code Online (Sandbox Code Playgroud)

那是因为函数和Observable都是惰性计算.如果你不调用该函数,console.log('Hello')则不会发生.对于Observables,如果你没有"调用"(subscribe),那么console.log('Hello')就不会发生.另外,"调用"或"订阅"是一个独立的操作:两个函数调用触发两个单独的副作用,两个Observable订阅触发两个单独的副作用.与无论订阅者是否存在共享副作用并且急切执行的EventEmitters相反,Observables没有共享执行并且是懒惰的.


到目前为止,函数和Observable的行为没有区别.这个StackOverflow问题可以更好地表达为"RxJS Observables vs functions?".

有些人声称Observables是异步的.事实并非如此.如果您使用日志包围函数调用,如下所示:

console.log('before');
console.log(foo.call());
console.log('after');
Run Code Online (Sandbox Code Playgroud)

您显然会看到输出:

"before"
"Hello"
42
"after"
Run Code Online (Sandbox Code Playgroud)

这与Observables的行为相同:

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');
Run Code Online (Sandbox Code Playgroud)

并输出:

"before"
"Hello"
42
"after"
Run Code Online (Sandbox Code Playgroud)

这证明订阅foo是完全同步的,就像一个功能.


那么Observable和函数之间真正区别的是什么呢?

Observable可以随时间"返回"多个值,而函数则不能.你不能这样做:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}
Run Code Online (Sandbox Code Playgroud)

函数只能返回一个值.但是,Observable可以这样做:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');
Run Code Online (Sandbox Code Playgroud)

使用同步输出:

"before"
"Hello"
42
100
200
"after"
Run Code Online (Sandbox Code Playgroud)

但您也可以异步"返回"值:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(function () {
    observer.next(300);
  }, 1000);
});
Run Code Online (Sandbox Code Playgroud)

随着输出:

"before"
"Hello"
42
100
200
"after"
300
Run Code Online (Sandbox Code Playgroud)

总之,

  • func.call()意思是" 立即(同步)给我一个值 "
  • obsv.subscribe()意思是" 给我价值.可能很多,也许同步,也许是异步的 "

这就是Observables是函数的泛化(没有参数).

  • 很好的解释,但与问题无关 (5认同)

Sai*_*ish 11

侦听器何时附加到 Emitter ?

使用事件发射器,只要发生了他们感兴趣的事件,就会通知侦听器。在事件发生后添加新的侦听器时,他将不知道过去的事件。新的监听器也不会知道之前发生的事件的历史。当然,我们可以手动编程我们的发射器和监听器来处理这个自定义逻辑。

使用反应流,订阅者获得从一开始就发生的事件流。所以他订阅的时间并不严格。现在他可以对流执行各种操作来获取他感兴趣的事件子流。

这样的好处就出来了:

  • 当我们需要处理随着时间推移发生的事件时
  • 它们发生的顺序
  • 事件发生的模式(比方说,在谷歌股票的每次买入事件之后,微软股票的卖出事件会在 5 分钟内发生)

高阶流:

高阶流是“流的流”:其事件值本身就是流的流。

对于事件发射器,一种方法是将相同的侦听器附加到多个事件发射器。当我们需要关联发生在不同发射器上的事件时,这会变得很复杂。

使用反应式流,这是轻而易举的。来自mostjs的示例(这是一个反应式编程库,类似于 RxJS 但性能更高)

const firstClick = most.fromEvent('click', document).take(1);
const mousemovesAfterFirstClick = firstClick.map(() =>
    most.fromEvent('mousemove', document)
        .takeUntil(most.of().delay(5000)))
Run Code Online (Sandbox Code Playgroud)

在上面的例子中,我们将点击事件与鼠标移动事件相关联。当事件作为流可用时,跨事件推断模式变得更容易完成。

话虽如此,使用 EventEmitter 我们可以通过过度设计我们的发射器和监听器来完成所有这些。它需要过度工程化,因为它最初并不适用于此类场景。而反应流可以如此流畅地做到这一点,因为它旨在解决此类问题。