当第一个作为参数传递的 observables 完成时完成 combineLatest 的变体

Pic*_*cci 6 reactive-programming rxjs

combineLatest运算符返回一个 Observable,当所有作为参数传入的 observable 完成时,该 ObservablecombineLatest完成。

有没有办法创建一个 Observable,它的行为与返回的 ObservablecombineLatest相同,唯一的区别是当第一个作为参数传递的 Observables完成时它完成?

car*_*ant 5

是的,你可以;你可以这样做:

function combineLatestUntilFirstComplete(...args) {
  const shared = args.map(a => a.share());
  return Rx.Observable
    .combineLatest(...shared)
    .takeUntil(Rx.Observable.merge(...shared.map(s => s.last())));
}

const a = Rx.Observable.interval(100).map(index => `a${index}`).take(5);
const b = Rx.Observable.interval(200).map(index => `b${index}`).take(5);

combineLatestUntilFirstComplete(a, b).subscribe(
  value => console.log(JSON.stringify(value)),
  error => console.error(error),
  () => console.log("complete")
);
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

该实现从内部调用返回的 observable 中获取值,combineLatest直到源 observable 之一发出其最后一个值。

请注意,源 observable 是共享的,因此由于takeUntil调用而产生的订阅不会影响对冷源 observable 的二级订阅。