RxJs避免外部状态,但仍然访问以前的值

arg*_*g20 4 javascript reactive-programming node.js rxjs

我正在使用RxJs来听amqp queu(不是真的相关).

我有一个函数createConnection返回一个Observable发出新连接对象的函数.一旦我有连接,我想每隔1000毫秒通过它发送消息,在10条消息之后我想要关闭连接.

我试图避免外部状态,但如果我不将连接存储在外部变量中,我该如何关闭它?请参阅我从连接开始,然后flatMap推送消息,因此在几个链之后我不再拥有连接对象.

这不是我的流程,但想象这样的事情:

createConnection()
  .flatMap(connection => connection.createChannel())
  .flatMap(channel => channel.send(message))
  .do(console.log)
  .subscribe(connection => connection.close()) <--- obviously connection isn't here
Run Code Online (Sandbox Code Playgroud)

现在我明白这样做是愚蠢的,但现在我如何访问连接?我当然可以先说var connection = createConnection()

然后以某种方式加入.但是我该怎么做?我甚至不知道如何正确地提出这个问题.Bottomline,我所拥有的是一个可观察的,发出连接,在连接打开后我想要一个每1000ms发出一次消息的observable(带a take(10)),然后关闭连接

Mat*_*ell 5

你问题的直接答案是"你可以完成每一步".例如,您可以替换此行

.flatMap(connection => connection.createChannel())
Run Code Online (Sandbox Code Playgroud)

这一个:

.flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))
Run Code Online (Sandbox Code Playgroud)

并保持对连接的访问​​.

但还有另一种方法可以做你想做的事.假设你的createConnection和createChannel函数看起来像这样:

function createConnection() {
  return Rx.Observable.create(observer => {
    console.log('creating connection');
    const connection = {
      createChannel: () => createChannel(),
      close: () => console.log('disposing connection')
    };

    observer.onNext(connection);

    return Rx.Disposable.create(() => connection.close());
  });
}

function createChannel() {
  return Rx.Observable.create(observer => {
    const channel = {
      send: x => console.log('sending message: ' + x)
    };

    observer.onNext(channel);

    // assuming no cleanup here, don't need to return disposable
  });
}
Run Code Online (Sandbox Code Playgroud)

createConnection(并且createChannel,但我们将专注于前者)返回一个冷的可观察者; 每个订阅者将获得包含单个连接的自己的连接流,并且当该订阅到期时,将自动调用dispose逻辑.

这允许你做这样的事情:

const subscription = createConnection()
  .flatMap(connection => connection.createChannel())
  .flatMap(channel => Rx.Observable.interval(1000).map(i => ({ channel: channel, data: i })))
  .take(10)
  .subscribe(x => x.channel.send(x.data))
;
Run Code Online (Sandbox Code Playgroud)

您实际上不必处理要进行清理的订阅; 在take(10)满意之后,整个链条将完成并且将触发清理.你明确需要在订阅上调用dispose的唯一原因是,如果你想在10 1000ms间隔之前解决问题.

请注意,此解决方案还包含对您的问题的直接答案的实例:我们将通道推送到该行,以便我们可以在传递给订阅调用的onNext lambda中使用它(通常在出现此类代码的情况下).

这是完整的工作:https://jsbin.com/korihe/3/edit?js,console,output