小编Jam*_*mes的帖子

对 rxjs 订阅进行反跳和缓冲

我有一个消息队列处理器,可以将消息提供给服务......

q.on("message", (m) => {
  service.create(m)
    .then(() => m.ack())
    .catch(() => n.nack())
})
Run Code Online (Sandbox Code Playgroud)

该服务使用 RxJS Observable 并订阅debounceTime()这些请求。

class Service {
  constructor() {
    this.subject = new Subject()
    this.subject.debounceTime(1000)
      .subscribe(({ req, resolve, reject }) =>
        someOtherService.doWork(req)
          .then(() => resolve())
          .catch(() => reject())
      )
  }

  create(req) {
    return new Promise((resolve, reject) =>
      this.subject.next({
        req,
        resolve,
        reject
      })
    )
  }
}
Run Code Online (Sandbox Code Playgroud)

问题是只有去抖请求才会得到 ackd/nackd。如何确保订阅也解决/拒绝其他请求? bufferTime()让我到达那里的一部分,但它不会重置每次调用的超时持续时间next()

rxjs rxjs5

5
推荐指数
2
解决办法
3212
查看次数

标签 统计

rxjs ×1

rxjs5 ×1