对 rxjs 订阅进行反跳和缓冲

Jam*_*mes 5 rxjs rxjs5

我有一个消息队列处理器,可以将消息提供给服务......

q.on("message", (m) => {
  service.create(m)
    .then(() => m.ack())
    .catch(() => n.nack())
})
Run Code Online (Sandbox Code Playgroud)

该服务使用 RxJS Observable 并订阅debounceTime()这些请求。

class Service {
  constructor() {
    this.subject = new Subject()
    this.subject.debounceTime(1000)
      .subscribe(({ req, resolve, reject }) =>
        someOtherService.doWork(req)
          .then(() => resolve())
          .catch(() => reject())
      )
  }

  create(req) {
    return new Promise((resolve, reject) =>
      this.subject.next({
        req,
        resolve,
        reject
      })
    )
  }
}
Run Code Online (Sandbox Code Playgroud)

问题是只有去抖请求才会得到 ackd/nackd。如何确保订阅也解决/拒绝其他请求? bufferTime()让我到达那里的一部分,但它不会重置每次调用的超时持续时间next()

Ign*_*tos 11

对于那些正在寻找 RXJS 6 解决方案的人,我创建了一个自定义运算符,其行为类似于debounce()+buffer()前面的答案中的

我调用了它bufferDebounce,Typescript 中带有类型推断的代码片段如下:

import { Observable, OperatorFunction } from 'rxjs'
import { buffer, debounceTime } from 'rxjs/operators'

type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
  new Observable(observer =>
    source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
      next(x) {
        observer.next(x);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
    }),
  );
Run Code Online (Sandbox Code Playgroud)

您可以在此示例中测试其行为,以检查这是否适合您https://stackblitz.com/edit/rxjs6-buffer-debounce


car*_*ant 6

您当前使用的运算debounceTime符可用于创建一个可观察对象,该可观察对象可以通知buffer何时应关闭当前缓冲区。

然后,buffer将发出在去抖动时收到的消息数组,您可以对它们执行任何您想要的操作:

this.subject = new Subject();
const closingNotifier = this.subject.debounceTime(1000);
this.subject.buffer(closingNotifier).subscribe(messages => {
  const last = messages.length - 1;
  messages.forEach(({ req, resolve, reject }, index) => {
    if (index === last) {
      /* whatever you are doing, now, with the debounced message */
    } else {
      /* whatever you need to do with the ignored messages */
    }
  });
});
Run Code Online (Sandbox Code Playgroud)

  • 那么为什么没有“bufferDebounce”运算符呢?我犯了一个错误,就是执行 `buffer(100)`,并且我只是偶然发现我每 100 毫秒就会得到一个空数组,永远!我本以为“buffer(100)”如果为空的话不会发出任何东西。我认为如果 10 秒内没有任何内容进入,此版本不会运行回调。 (2认同)