ric*_*ong 5 javascript promise observable rxjs ecmascript-6
我最近一直挂在这个话题上。似乎 AsyncIterables 和 Observables 都具有类似流的特性,尽管它们的使用方式略有不同。
您可以像这样使用异步迭代
const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }
const main = async () => {
for await (const number of myAsyncIterable()) {
console.log(number)
}
}
main()Run Code Online (Sandbox Code Playgroud)
您可以像这样使用 observable
const Observable = rxjs
const { map } = rxjs.operators
Observable.of(1, 2, 3).subscribe(x => console.log(x))Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>Run Code Online (Sandbox Code Playgroud)
我的首要问题是基于这个RxJS pr
如果 observable 以比循环完成的速度更快的速度发出,则随着缓冲区变得更满,内存会增加。我们可以提供使用不同策略的其他方法(例如,仅使用最近的值等),但将其保留为默认值。请注意,循环本身可能有多个等待,这会加剧问题。
在我看来,异步迭代器本质上没有背压问题,那么Symbol.asyncIterator在 Observable 上实现(@@asyncIterator) 并默认为背压策略是否正确?鉴于 AsyncIterables,甚至需要 Observables 吗?
理想情况下,您可以通过代码示例向我展示 AsyncIterables 和 Observables 之间的实际差异。
主要区别在于哪一方决定何时进行迭代。
在异步迭代器的情况下,客户端通过调用await iterator.next(). 源决定何时解决承诺,但客户端必须首先请求下一个值。因此,消费者从源“拉”入数据。
Observable 注册了一个回调函数,当一个新值进来时,它会立即被 observable 调用。因此,源“推送”给消费者。
通过使用 Subject 并将其映射到异步迭代器的下一个值,可以轻松地使用 Observable 来使用异步迭代器。每当您准备好消费下一个项目时,您就会在 Subject 上调用 next 。这是一个代码示例
const pull = new Subject();
const output = pull.pipe(
concatMap(() => from(iter.next())),
map(val => {
if(val.done) pull.complete();
return val.value;
})
);
//wherever you need this
output.pipe(
).subscribe(() => {
//we're ready for the next item
if(!pull.closed) pull.next();
});
Run Code Online (Sandbox Code Playgroud)