来自NodeJS Express'Re post'的ReactiveX Observable

4 asynchronous reactive-programming node.js express rxjs

是否可以从以下位置创建ReactiveX Observable:

app = express();
app.post('/path', function() {...})
Run Code Online (Sandbox Code Playgroud)

我的意思是,有一种方法可以创建一个observable fromEvent,其中我已经使用了许多已经注册的事件,object.on('eventName', function(){})但是来自express的帖子并不完全相同.

Mar*_*ten 7

Rx-ify一路表达?

尝试将快速路由的回调式实现转换为Rx的额外复杂性让我得出结论,这不是前进的方法.这是因为快递不知道Rx因此没有订阅寿命和多事件发射的概念.

马丁的答案是一个概念验证,您可以清楚地看到上述问题.这意味着对于每条路线,您需要创建一个单独的Subjectshare()一个主题,然后为每个路径制作大量过滤器以分离实现.这是多余的,因为express已经将您的请求路由到正确的处理程序.

将Rx世界与快速回调相结合

如果你希望将Expressjs路由与RxJs结合起来,我会将路由逻辑分开,并将其保留在快速回调中,并且实际实现使用Rx结合a .toPromise()来激活它:

app.post('/user/:id', (req, res) => {
  return getUserById(req.params.id)
    .toPromise()/* return the Rx stream as promise to express so it traces its lifecycle */
    .then(
      user => res.send(user),
      err => res.status(500).send(err.message)
    );
});    

function getUserById(id) {
  // stub implementation
  return Rx.Observable.of({ id, name: 'username' }) 
    .delay(100);
}
Run Code Online (Sandbox Code Playgroud)


mar*_*tin 6

我认为最简单的方法是使用a Subject重新发送事件express.

var express = require('express');
var Rx = require('rxjs');
var app = express();

let subject = new Rx.Subject();

app.get('/', (req, res) => subject.next([req, res]));

subject
  .do(a => console.log('123345'))
  .subscribe(args => {
    let [req, res] = args;
    res.send('Hello World!');
  });

app.listen(3000, function () {
  console.log('Example app listening on port 3000!')
});
Run Code Online (Sandbox Code Playgroud)

主要区别在于您需要将数据包装[req, res]为数组,因为您希望将两者都传递给数组subject.next(...).这可以在以后解压缩let [req, res] = args;.

不幸的是,你无法使用,Observable.bindCallback因为它只需要包装函数的一个响应,然后完成现在你想要的在这种情况下.

  • 不,在RxJS 5中,该方法只被称为`next()`.较旧的RxJS 4中使用了诸如`onNext()`之类的名称. (2认同)