除了@马克西姆的回答,您可以用比赛来创建你观察到的第一个值和超时之间的竞争.我们通过合并构建超时never使用timeout.
因此,我们最终会在您的source observable和一个永远不会发出值的observable之间进行竞争,但会在时间过去之后抛出错误.如果source observable产生第一个值,那么race将停止监听超时可观察值.
const timed = source.race(Observable.never().timeout(1000));
Run Code Online (Sandbox Code Playgroud)
据我所知,首先没有超时,这是一种解决方法:
(Plunkr 在这里可用)
const { Observable } = Rx;
// this function can be use with the let operator
// ex: someObs$.let(timeoutFirstOnly(100)).subscribe(...)
// it'll emit an error if the first value arrives later
// than timeout parameter value
const timeoutFirstOnly = timeout => observable$ => {
const obs$ = observable$.share();
const [firstValue$, others$] = obs$.partition((value, index) => index === 0);
return Observable.merge(firstValue$.timeout(timeout).first(), others$);
};
const created = Rx.Observable.create(observer => {
// here, this is working
// try to emit at 150ms for example and it'll throw an error
setTimeout(() => observer.next(42), 150);
});
created
.let(timeoutFirstOnly(100))
.subscribe(
val => console.log(`received ${val}`),
error => console.log(`threw ${error}`)
);
Run Code Online (Sandbox Code Playgroud)
但是,我觉得可能还有另一种方法,所以我在Gitter上问,@ Dorus给了我更好的结果:
created.publish(src =>
Rx.Observable.merge(
src.take(1).timeout(100),
src.skip(1)
)
);
Run Code Online (Sandbox Code Playgroud)
在这里说明,所有功劳归功于@Dorus:
publish 有2种口味:
.publish() : ConnectableObservable<T>.publish(selector : (src : Observable<T>) => Observable<R>):
Observable<R>.publish()返回a ConnectableObservable,可以通过调用refCount它来使用,也可以connect自己调用它来使用。
.publish(selector)让您在选择器中使用多播源并发出结果。顺便说一下,.publish()与相同,.multicast(() => new Subject())并且.publish(selector)相同.multicast(() => new Subject(), selector)。
也.publish().refCount()与几乎相同.share()。您share之前打过电话是正确的partition。那部分地解决了问题。我仍然喜欢.publish(selector)它,因为它更安全:在订阅可观察的源之前,.publish(selector)将运行选择器和subscribe所有源。使用共享时,第一个订阅将激活源,第二个订阅将丢失发出的所有同步内容。这可能与使用总同步源一样糟糕,然后第二个订阅将完全重新运行源。
最后一个添加项:.publish(selector)具有与相同的签名.let(selector),但let不多播源。就是这样。
| 归档时间: |
|
| 查看次数: |
1370 次 |
| 最近记录: |