loo*_*oop 6 javascript asynchronous iterable observable rxjs
有时我想使用 RxJS 运算符来操作无限的异步迭代而不缓冲值。将可迭代对象转换为可观察对象很容易。以下将 Observable 转换为异步可迭代的方法是否有缺点?
const iterable = async function* (observable) {
let buffer = [],
resolve,
reject;
const subscription = observable.subscribe({
next: value => {
if (resolve) {
resolve(value);
resolve = reject = undefined;
} else {
buffer.push(Promise.resolve(value));
}
},
error: e => {
if (reject) {
reject(e);
resolve = reject = undefined;
}
},
complete: () => {},
});
while (!subscription.isStopped || buffer.length) {
yield buffer.shift() ||
new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
}
subscription.unsubscribe();
};
Run Code Online (Sandbox Code Playgroud)
这是可迭代作为观察者的替代实现(不使用生成器)。
class IterableObserver {
constructor(observable) {
this.buffer = [];
this.resolve = undefined;
this.reject = undefined;
this.isStopped = false;
observable && observable.subscribe(this);
}
[Symbol.asyncIterator]() {
const t = this;
return {
next: async () => {
if (!t.isStopped || t.buffer.length) {
if (t.buffer.length) {
return {
value: t.buffer.shift(),
};
} else {
return new Promise((_resolve, _reject) => {
t.resolve = _resolve;
t.reject = _reject;
});
}
} else {
return { done: true };
}
},
};
}
next(value) {
if (this.resolve) {
this.resolve({ value });
this.resolve = this.reject = undefined;
} else {
this.buffer.push(value);
}
}
error(e) {
this.isStopped = true;
if (this.reject) {
this.reject(e);
this.resolve = this.reject = undefined;
}
}
complete() {
this.isStopped = true;
}
}
Run Code Online (Sandbox Code Playgroud)
这样做的好处受到质疑。假设您有一个 API,它通过函数makeTextFileLineIterator为您提供文本文件行的异步迭代,并且您需要为客户端提供大写的前 N 行的异步迭代。你会怎么做?使用 RxJS 运算符和可迭代转换,这将很容易:
const lineIterator = makeTextFileLineIterator('https://cdn.skypack.dev/rxjs@^7.1.0/operators?min');
const upperCaseLines = new IterableObserver(from(lineIterator).pipe(
take(6),
map(line=>line.toLocaleUpperCase())),
);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1283 次 |
| 最近记录: |