Rx:即使调用了 onError,如何获取最后一个元素?

cde*_*zaq 5 java rx-java reactivex

我正在使用 RxJava,我需要做两件事:

  • 获取从发出的最后一个元素 Observable
  • 确定是否onError被调用,vs。onCompleted

我已经考虑过使用lastlastOrDefault(这实际上是我需要的行为),但我无法解决onError隐藏最后一个元素的问题。我可以使用 Observable 两次,一次获取last值,一次获取完成状态,但到目前为止,我只能通过创建自己的来完成此操作Observer

public class CacheLastObserver<T> implements Observer<T> {

    private final AtomicReference<T> lastMessageReceived = new AtomicReference<>();
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    @Override
    public void onCompleted() {
        // Do nothing
    }

    @Override
    public void onError(Throwable e) {
        error.set(e);
    }

    @Override
    public void onNext(T message) {
        lastMessageReceived.set(message);
    }

    public Optional<T> getLastMessageReceived() {
        return Optional.ofNullable(lastMessageReceived.get());
    }

    public Optional<Throwable> getError() {
        return Optional.ofNullable(error.get());
    }
}
Run Code Online (Sandbox Code Playgroud)

我自己制作没有问题Observer,但感觉 Rx 应该能够更好地满足“在完成之前发出最后一个元素”的用例。关于如何实现这一点的任何想法?

Dav*_*ten 4

尝试这个:

source.materialize().buffer(2).last()
Run Code Online (Sandbox Code Playgroud)

在错误情况下,最后一次发射将是两个项目的列表,即包装为 a 的最后一次发射值Notification和错误通知。如果没有错误,第二项将是完成通知。

另请注意,如果未发出任何值,则结果将是一个包含终端通知的一项的列表。