AsyncIterable 和 Observable 之间的实际区别是什么?

ric*_*ong 5 javascript promise observable rxjs ecmascript-6

我最近一直挂在这个话题上。似乎 AsyncIterables 和 Observables 都具有类似流的特性,尽管它们的使用方式略有不同。

您可以像这样使用异步迭代

const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }

const main = async () => {
  for await (const number of myAsyncIterable()) {
    console.log(number)
  }
}

main()
Run Code Online (Sandbox Code Playgroud)

您可以像这样使用 observable

const Observable = rxjs
const { map } = rxjs.operators

Observable.of(1, 2, 3).subscribe(x => console.log(x))
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

我的首要问题是基于这个RxJS pr

如果 observable 以比循环完成的速度更快的速度发出,则随着缓冲区变得更满,内存会增加。我们可以提供使用不同策略的其他方法(例如,仅使用最近的值等),但将其保留为默认值。请注意,循环本身可能有多个等待,这会加剧问题。

在我看来,异步迭代器本质上没有背压问题,那么Symbol.asyncIterator在 Observable 上实现(@@asyncIterator) 并默认为背压策略是否正确?鉴于 AsyncIterables,甚至需要 Observables 吗?

理想情况下,您可以通过代码示例向我展示 AsyncIterables 和 Observables 之间的实际差异。

ran*_*992 7

主要区别在于哪一方决定何时进行迭代。

在异步迭代器的情况下,客户端通过调用await iterator.next(). 源决定何时解决承诺,但客户端必须首先请求下一个值。因此,消费者从源“拉”入数据。

Observable 注册了一个回调函数,当一个新值进来时,它会立即被 observable 调用。因此,源“推送”给消费者。

通过使用 Subject 并将其映射到异步迭代器的下一个值,可以轻松地使用 Observable 来使用异步迭代器。每当您准备好消费下一个项目时,您就会在 Subject 上调用 next 。这是一个代码示例

const pull = new Subject();
const output = pull.pipe(
  concatMap(() => from(iter.next())),
  map(val => { 
    if(val.done) pull.complete();
    return val.value;
  })
);
//wherever you need this 
output.pipe(

).subscribe(() => {
  //we're ready for the next item
  if(!pull.closed) pull.next();
});

Run Code Online (Sandbox Code Playgroud)