如何从onmessage回调中创建一个observable?

Che*_*tah 4 javascript websocket rxjs

我已经习惯了在.NET和Java中使用它,我希望能够做到以下几点:

Rx.Observable.fromCallback(websocket.onmessage)
    .map(...)
    .subscribe(...);
Run Code Online (Sandbox Code Playgroud)

但是,控制台具有以下内容:

Uncaught TypeError: Rx.Observable.fromCallback(websocket.onmessage).map is not a function

这似乎表明它fromCallback没有返回一个Observable.

我在这做错了什么?我误解了fromCallback正在做什么,我需要使用Subject?我可以不在一个observable中包含一些任意的处理程序吗?

pau*_*els 10

你实际上在寻找fromEventfromEventPattern:

Rx.Observable.fromEvent(websocket, 'message').map(/*...*/).subscribe();

Rx.Observable.fromEventPattern(
  function add(h) { websocket.addEventListener(h); }, 
  function remove(h) { websocket.removeEventListener(h); })
 .map(/*...*/)
 .subscribe();
Run Code Online (Sandbox Code Playgroud)

第一个将尝试使用一些订阅事件发射器的标准方法,即WebSocket.但是,如果失败,您可以fromEventPattern改为指定如何在对象中添加或删除处理程序.

另外需要注意的是,JavaScript不传递对您正在使用的对象实例的隐式引用,如C#和Java所做的那样,因此您的代码fromCallback(websocket.onmessage)不会传递websocket,它会从函数原型传递给方法的引用.this将在执行时确定.

Rx.Observable.fromCallback用于最后一个参数是回调函数的函数,该函数是异步JavaScript代码的标准模式.此外,该fromCallback方法不返回Observable它返回一个函数,当被调用时返回一个Observableie

function methodWithCallback(arg0, arg1, cb) {
  setTimeout(function() {
    cb(arg0 + arg1);
  }, 2000);
}

var newMethod = Rx.Observable.fromCallback(methodWithCallback);

//[After 2 seconds] 3
newMethod(1, 2).subscribe(console.log.bind(console));
Run Code Online (Sandbox Code Playgroud)