我有一个消息队列处理器,可以将消息提供给服务......
q.on("message", (m) => {
service.create(m)
.then(() => m.ack())
.catch(() => n.nack())
})
Run Code Online (Sandbox Code Playgroud)
该服务使用 RxJS Observable 并订阅debounceTime()这些请求。
class Service {
constructor() {
this.subject = new Subject()
this.subject.debounceTime(1000)
.subscribe(({ req, resolve, reject }) =>
someOtherService.doWork(req)
.then(() => resolve())
.catch(() => reject())
)
}
create(req) {
return new Promise((resolve, reject) =>
this.subject.next({
req,
resolve,
reject
})
)
}
}
Run Code Online (Sandbox Code Playgroud)
问题是只有去抖请求才会得到 ackd/nackd。如何确保订阅也解决/拒绝其他请求? bufferTime()让我到达那里的一部分,但它不会重置每次调用的超时持续时间next()。