arg*_*g20 4 javascript reactive-programming node.js rxjs
我正在使用RxJs来听amqp queu(不是真的相关).
我有一个函数createConnection返回一个Observable发出新连接对象的函数.一旦我有连接,我想每隔1000毫秒通过它发送消息,在10条消息之后我想要关闭连接.
我试图避免外部状态,但如果我不将连接存储在外部变量中,我该如何关闭它?请参阅我从连接开始,然后flatMap推送消息,因此在几个链之后我不再拥有连接对象.
这不是我的流程,但想象这样的事情:
createConnection()
.flatMap(connection => connection.createChannel())
.flatMap(channel => channel.send(message))
.do(console.log)
.subscribe(connection => connection.close()) <--- obviously connection isn't here
Run Code Online (Sandbox Code Playgroud)
现在我明白这样做是愚蠢的,但现在我如何访问连接?我当然可以先说var connection = createConnection()
然后以某种方式加入.但是我该怎么做?我甚至不知道如何正确地提出这个问题.Bottomline,我所拥有的是一个可观察的,发出连接,在连接打开后我想要一个每1000ms发出一次消息的observable(带a take(10)),然后关闭连接
你问题的直接答案是"你可以完成每一步".例如,您可以替换此行
.flatMap(connection => connection.createChannel())
Run Code Online (Sandbox Code Playgroud)
这一个:
.flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))
Run Code Online (Sandbox Code Playgroud)
并保持对连接的访问.
但还有另一种方法可以做你想做的事.假设你的createConnection和createChannel函数看起来像这样:
function createConnection() {
return Rx.Observable.create(observer => {
console.log('creating connection');
const connection = {
createChannel: () => createChannel(),
close: () => console.log('disposing connection')
};
observer.onNext(connection);
return Rx.Disposable.create(() => connection.close());
});
}
function createChannel() {
return Rx.Observable.create(observer => {
const channel = {
send: x => console.log('sending message: ' + x)
};
observer.onNext(channel);
// assuming no cleanup here, don't need to return disposable
});
}
Run Code Online (Sandbox Code Playgroud)
createConnection(并且createChannel,但我们将专注于前者)返回一个冷的可观察者; 每个订阅者将获得包含单个连接的自己的连接流,并且当该订阅到期时,将自动调用dispose逻辑.
这允许你做这样的事情:
const subscription = createConnection()
.flatMap(connection => connection.createChannel())
.flatMap(channel => Rx.Observable.interval(1000).map(i => ({ channel: channel, data: i })))
.take(10)
.subscribe(x => x.channel.send(x.data))
;
Run Code Online (Sandbox Code Playgroud)
您实际上不必处理要进行清理的订阅; 在take(10)满意之后,整个链条将完成并且将触发清理.你明确需要在订阅上调用dispose的唯一原因是,如果你想在10 1000ms间隔之前解决问题.
请注意,此解决方案还包含对您的问题的直接答案的实例:我们将通道推送到该行,以便我们可以在传递给订阅调用的onNext lambda中使用它(通常在出现此类代码的情况下).
这是完整的工作:https://jsbin.com/korihe/3/edit?js,console,output
| 归档时间: |
|
| 查看次数: |
1211 次 |
| 最近记录: |