RxJS + node.js HTTP服务器实现?

8 node.js rxjs

我一直很好地使用node.js直到RxJS实现.

这是我的试用代码研究 -

Reactive-Extensions/rxjs-node https://github.com/Reactive-Extensions/rxjs-node


rx_http.js
(node.js的http lib的RxJS包装器)

var Rx = require("./rx.min");
var http = require("http");
for(var k in http)
{
    exports[k] = http[k];
}
exports.createServer = function ()
{
    var subject = new Rx.AsyncSubject();
    var observable = subject.asObservable();
    observable.server = http.createServer(function (request, response)
    {
       subject.onNext({ request:request, response:response });
       subject.onCompleted();
    });
    return observable;
};
Run Code Online (Sandbox Code Playgroud)

server.js

var http = require('./rx_http');

// rxServer
var serverObservable = http.createServer();
var port = 3000;
serverObservable.server.listen(port);
console.log("Server listening on port: "+port);

// HTTP request event loop function
serverObservable.subscribe(function (data)
{
    var req = data.request;
    console.log(req.headers);

    var res = data.response;
    res.writeHead(200, {'Content-Type':"text/html"});
    res.end("hello world");

    console.log("res content out");
});

// exceptiopn
process.on('uncaughtException', function (err)
{
    console.log(['Caught exception:', err.message].join(" "));
});
Run Code Online (Sandbox Code Playgroud)

代码最终向浏览器输出一次"hello world"输出,并且RxServer停止对另一个访问(brwoser reload等)做出反应.

我正在学习RxJS的事情,但在网上找到的文档很少.

告诉我代码有什么问题,如果你知道更好的实现,请分享.谢谢.

hjy*_*hjy 5

在rx_http.js中使用Rx.Subject而不是Rx.AsyncSubject.

AsyncSubject缓存onNext()的最后一个值,并在完成时将其传播给所有观察者.AsyncSubject

exports.createServer = function ()
{
    var subject = new Rx.Subject();
    var observable = subject.asObservable();
    observable.server = http.createServer(function (request, response)
    {
       subject.onNext({ request:request, response:response });
    });
    return observable;
};
Run Code Online (Sandbox Code Playgroud)


小智 3

当第一个请求到达时,对主题调用 oncompleted 会结束可观察序列。您能否删除该行并重试。

我希望它有帮助。

艾哈迈德·阿里·阿卡斯