有时我想使用 RxJS 运算符来操作无限的异步迭代而不缓冲值。将可迭代对象转换为可观察对象很容易。以下将 Observable 转换为异步可迭代的方法是否有缺点?
const iterable = async function* (observable) {
let buffer = [],
resolve,
reject;
const subscription = observable.subscribe({
next: value => {
if (resolve) {
resolve(value);
resolve = reject = undefined;
} else {
buffer.push(Promise.resolve(value));
}
},
error: e => {
if (reject) {
reject(e);
resolve = reject = undefined;
}
},
complete: () => {},
});
while (!subscription.isStopped || buffer.length) {
yield buffer.shift() ||
new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject; …Run Code Online (Sandbox Code Playgroud)