rxjs超时到第一个值

Dan*_*sky 9 timeout sequence rxjs

所以我从这个问题中了解到这里我理解timeout操作符错误,如果一个observable在给定的时间窗口内没有发出任何值...我的问题是,这个时间窗口在每次发射后重置,这使得有必要如果您只对窗口内的第一个值发出感兴趣,请完成序列...

有没有一个很好的方法来"超时到第一"?除了.take(1).timeout(1000)

Bra*_*don 8

除了@马克西姆的回答,您可以用比赛来创建你观察到的第一个值和超时之间的竞争.我们通过合并构建超时never使用timeout.

因此,我们最终会在您的source observable和一个永远不会发出值的observable之间进行竞争,但会在时间过去之后抛出错误.如果source observable产生第一个值,那么race将停止监听超时可观察值.

const timed = source.race(Observable.never().timeout(1000));
Run Code Online (Sandbox Code Playgroud)

  • RXJS 6+ Equiv:`race(source $,NEVER.pipe(timeout(1000)));` (2认同)

max*_*992 5

据我所知,首先没有超时,这是一种解决方法:
(Plunkr 在这里可用)

const { Observable } = Rx;

// this function can be use with the let operator
// ex: someObs$.let(timeoutFirstOnly(100)).subscribe(...)
// it'll emit an error if the first value arrives later
// than timeout parameter value
const timeoutFirstOnly = timeout => observable$ => {
  const obs$ = observable$.share();

  const [firstValue$, others$] = obs$.partition((value, index) => index === 0);

  return Observable.merge(firstValue$.timeout(timeout).first(), others$);
};

const created = Rx.Observable.create(observer => {
  // here, this is working
  // try to emit at 150ms for example and it'll throw an error
  setTimeout(() => observer.next(42), 150);
});

created
  .let(timeoutFirstOnly(100))
  .subscribe(
    val => console.log(`received ${val}`),
    error => console.log(`threw ${error}`)
  );
Run Code Online (Sandbox Code Playgroud)

但是,我觉得可能还有另一种方法,所以我在Gitter上问,@ Dorus给了我更好的结果:

created.publish(src =>
  Rx.Observable.merge(
    src.take(1).timeout(100),
    src.skip(1)
  )
);
Run Code Online (Sandbox Code Playgroud)

在这里说明,所有功劳归功于@Dorus

publish 有2种口味:

  1. .publish() : ConnectableObservable<T>
  2. .publish(selector : (src : Observable<T>) => Observable<R>): Observable<R>

.publish()返回a ConnectableObservable,可以通过调用refCount它来使用,也可以connect自己调用它来使用。 .publish(selector)让您在选择器中使用多播源并发出结果。顺便说一下,.publish()与相同,.multicast(() => new Subject())并且.publish(selector)相同.multicast(() => new Subject(), selector)

.publish().refCount()与几乎相同.share()。您share之前打过电话是正确的partition。那部分地解决了问题。我仍然喜欢.publish(selector)它,因为它更安全:在订阅可观察的源之前,.publish(selector)将运行选择器和subscribe所有源。使用共享时,第一个订阅将激活源,第二个订阅将丢失发出的所有同步内容。这可能与使用总同步源一样糟糕,然后第二个订阅将完全重新运行源。

最后一个添加项:.publish(selector)具有与相同的签名.let(selector),但let不多播源。就是这样。