将 RxJS Observable 转换为异步可迭代

loo*_*oop 6 javascript asynchronous iterable observable rxjs

有时我想使用 RxJS 运算符来操作无限的异步迭代而不缓冲值。将可迭代对象转换为可观察对象很容易。以下将 Observable 转换为异步可迭代的方法是否有缺点?

const iterable = async function* (observable) {
  let buffer = [],
    resolve,
    reject;
  const subscription = observable.subscribe({
    next: value => {
      if (resolve) {
        resolve(value);
        resolve = reject = undefined;
      } else {
        buffer.push(Promise.resolve(value));
      }
    },
    error: e => {
      if (reject) {
        reject(e);
        resolve = reject = undefined;
      }
    },
    complete: () => {},
  });
  while (!subscription.isStopped || buffer.length) {
    yield buffer.shift() ||
      new Promise((_resolve, _reject) => {
        resolve = _resolve;
        reject = _reject;
      });
  }
  subscription.unsubscribe();
};
Run Code Online (Sandbox Code Playgroud)

Codepen 演示


这是可迭代作为观察者的替代实现(不使用生成器)。

class IterableObserver {
  constructor(observable) {
    this.buffer = [];
    this.resolve = undefined;
    this.reject = undefined;
    this.isStopped = false;
    observable && observable.subscribe(this);
  }

  [Symbol.asyncIterator]() {
    const t = this;
    return {
      next: async () => {
        if (!t.isStopped || t.buffer.length) {
          if (t.buffer.length) {
            return {
              value: t.buffer.shift(),
            };
          } else {
            return new Promise((_resolve, _reject) => {
              t.resolve = _resolve;
              t.reject = _reject;
            });
          }
        } else {
          return { done: true };
        }
      },
    };
  }

  next(value) {
    if (this.resolve) {
      this.resolve({ value });
      this.resolve = this.reject = undefined;
    } else {
      this.buffer.push(value);
    }
  }
  error(e) {
    this.isStopped = true;
    if (this.reject) {
      this.reject(e);
      this.resolve = this.reject = undefined;
    }
  }
  complete() {
    this.isStopped = true;
  }
}
Run Code Online (Sandbox Code Playgroud)

这样做的好处受到质疑。假设您有一个 API,它通过函数makeTextFileLineIterator为您提供文本文件行的异步迭代,并且您需要为客户端提供大写的前 N ​​行的异步迭代。你会怎么做?使用 RxJS 运算符和可迭代转换,这将很容易:

const lineIterator = makeTextFileLineIterator('https://cdn.skypack.dev/rxjs@^7.1.0/operators?min');

const upperCaseLines = new IterableObserver(from(lineIterator).pipe(
  take(6),
  map(line=>line.toLocaleUpperCase())),
);
Run Code Online (Sandbox Code Playgroud)

Codepen 演示