如何在NodeJ中正确地将WS服务器转换为没有主题的RXJS Api

Max*_*der 11 reactive-programming websocket node.js rxjs

将着名的ws模块转换为Node.js中的被动api的正确方法是什么?我知道主题可以帮助弥合非反应性事件的反应,但他们的问题是他们在处理他们的依赖对象时要困难得多.

var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({ port: 8080 });
var Rx = require('rx');  


var connectionMessageSubject = new Rx.Subject();

wss.on('connection', function connection(client) {
  ws.on('message', function incoming(message) {
    connectionMessageSubject.onNext({
      client: client,
      message: message
    });
  });
});
Run Code Online (Sandbox Code Playgroud)

我不能使用他们内置的fromEvent方法,因为它注册了很多不同的事件,当30个或更多客户端连接时,NodeJS会发出警告.

例如...

var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({port:8080});

var connectionMessageObservable;

//this uses a tremendous amount of memory and throws warnings that the event emitter has a maximum of 30 listeners 
wss.on('connection', function connection(client){
  connnectionMessageObservable = Rx.Observable.fromEvent(client, 'message');
});
Run Code Online (Sandbox Code Playgroud)

Raf*_*lis 1

以下代码模拟该subject行为。

var WebSocketServer = require('ws').Server;
var wss = new WebSocketServer({port:8080});

var connectionMessage$ = new Rx.Observable(function (observer) {
    wss.on('connection', function connection(client){
        client.on('message', function (message){
            observer.next({
                client: client,
                message: message,
            })
        });
    });    
});

connectionMessage$.subscribe(function (cm) {
    // cm.client for client
    // cm.message for message
});
Run Code Online (Sandbox Code Playgroud)