如何在Angular 2中使用RXJS观察自定义事件?

Shi*_*esh 4 javascript rxjs rxjs5 angular

我有一个第三方库,打算与RxJS集成。这是一个称为Tiger Text的消息传递库。根据他们的说法,我可以听一个称为消息的事件,当流中有消息时,我可以使用它来进一步利用它。相同的代码段如下:

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' })

client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
  onSignedIn(session)
})

function onSignedIn(session) {
  console.log('Signed in as', session.user.displayName)

  client.messages.sendToUser(
    'someone@mail.com',
    'hello!'
  ).then(function (message) {
    console.log('sent', message.body, 'to', message.recipient.displayName)
  })

  client.events.connect()

  client.on('message', function (message) {
    console.log(
      'message event',
      message.sender.displayName,
      'to',
      message.recipient.displayName,
      ':',
      message.body
    )
  })
}
Run Code Online (Sandbox Code Playgroud)

现在,请看一下以下提到的代码所在的位置。

client.on('message', function (message) {
    console.log(
      'message event',
      message.sender.displayName,
      'to',
      message.recipient.displayName,
      ':',
      message.body
    )
  })
Run Code Online (Sandbox Code Playgroud)

我想知道如何使用RxJS来从这段代码中创建一个可观察的内容,以便订阅流,并且每当我们进行更改时,我都会根据需要获取新数据并对其进行处理。

请指教。

car*_*ant 5

您可以使用fromEventPattern从自定义事件创建可观察对象:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEventPattern';

const messages = Observable.fromEventPattern(
  handler => client.on('message', handler),
  handler => client.off('message', handler)
);
messages.subscribe(message => console.log(message));
Run Code Online (Sandbox Code Playgroud)

您可以fromEventPattern使用自定义 API 的添加和删除机制传递给添加和删​​除事件处理程序的函数。您没有将它包含在您的问题中,但我假设您使用的 API 实现了一个off方法。


mar*_*tin 5

对于这种用例,通常不需要编写自定义Observable,而只需使用即可Observable.create()。然后,这取决于您要编写冷的还是热的可观察物。

对于冷的 Observable,您可以在订阅时创建值的生产者,在取消订阅时将其关闭:

Observable.create(obs => {
  var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
  client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
    onSignedIn(session);
  });

  client.on('message', function (message) {
    obs.next(...);
  });

  return () => {
    client.close(); // or whatever...
  };
});
Run Code Online (Sandbox Code Playgroud)

或者,如果您想编写一个热门的 Observable,则生产者将在任何订阅中独立存在,而只需添加/删除侦听器:

var client = new TigerConnect.Client({ defaultOrganizationId: 'some-org-id' });
client.signIn('user@mail.com', 's3cr3t', { udid: 'unique-device-id' }).then(function (session) {
  onSignedIn(session);
});

Observable.create(obs => {
  let listener = client.on('message', function (message) {
    obs.next(...);
  });

  () => {
    // remove the event listener somehow
    listener.remove();
  };
});
Run Code Online (Sandbox Code Playgroud)

有时您可以使用a来解决此问题,Subject但这通常比使用a 更为复杂,Observable.create()因为这时您需要自己处理创建和拆卸逻辑,并且Subject拥有内部状态。

这是一个与您的问题非常相似的问题:

RxJS的主要开发人员针对与您的问题相关的主题的文章: