从不完整的可观察值中获取最后一个值

Est*_*ask 7 observable rxjs rxjs5

存在一个不完整的可观察量,它可以具有或不具有 n 个值的重播。我想从中获取最后一个值 - 或者如果还没有,则仅获取下一个值。

这适用于使用first()and的第一个可用值take(1)示例):

possiblyReplayedIncomplteObservable.first().toPromise().then(val => ...);
Run Code Online (Sandbox Code Playgroud)

但对于最后一个值,两者last()takeLast(1)等待可观察的完成 - 这不是这里理想的行为。

如何解决这个问题?有专门的运营商吗?

Mei*_*eir 4

我有一个解决方案ReplaySubject(2),可以“耗尽”序列以获取最新元素,如果序列为空,则只需获取最后一个元素,但是,它很麻烦并且不能很好地扩展(例如,如果您决定增加重播大小)至 3)。然后我想起重播/行为主题在通过管道传送时往往很难管理。最简单的解决方案是创建一个“影子”序列并将您的 ReplaySubject 通过管道传输到其中(而不是通过对 ReplaySubject 进行转换/操作来创建它),因此:

var subject$ = new Rx.ReplaySubject(3);
var lastValue$ = new Rx.ReplaySubject(1);
subject$.subscribe(lastValue$); // short hand for subject$.subscribe(v => lastValue$.next(v))

lastValue$.take(1).toPromise().then(...);
Run Code Online (Sandbox Code Playgroud)

========== 旧的解决方案,忽略 ReplaySubject(2) =================

阅读下面的评论后,正确的代码是:

Rx.Observable.combineLatest(possiblyReplayedIncomplteObservable).take(1).subscribe(...)
Run Code Online (Sandbox Code Playgroud)

并不是

Rx.Observable.combineLatest(possiblyReplayedIncomplteObservable).subscribe(...)
Run Code Online (Sandbox Code Playgroud)

这是因为该承诺是“一次性”可观察的。我认为toPromise()代码只有在完成后才能解析结果。

不会take(1)影响您的原始流,因为它在由 创建的新流上运行combineLatest

实际上,最简单的方法是:

possiblyReplayedIncomplteObservable.take(1).toPromise().then(...)
Run Code Online (Sandbox Code Playgroud)