RxJava:调用onError而不完成/取消订阅

Jaa*_*tum 4 error-handling rx-java

我有以下代码(*),它使用递归调用提供的observable的调度程序实现轮询.

(*)灵感来自https://github.com/ReactiveX/RxJava/issues/448

当我只将onNext事件传递给订阅者时,这正常工作.但是当我将onError事件传递给订阅者时,将调用取消订阅事件,这反过来会导致调度程序被杀死.

我还想将错误传递给订阅者.任何想法如何实现?

public Observable<Status> observe() {
    return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS));
}

private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> {
    private Subscription subscription;
    private Subscription innerSubscription;
    private Scheduler.Worker worker = Schedulers.newThread().createWorker();

    private Observable<T> observable;
    private long delayTime;
    private TimeUnit unit;

    public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) {
        this.observable = observable;
        this.delayTime = delayTime;
        this.unit = unit;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        subscription = worker.schedule(new Action0() {
            @Override
            public void call() {
                schedule(subscriber, true);
            }
        });

        subscriber.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                subscription.unsubscribe();
                if (innerSubscription != null) {
                    innerSubscription.unsubscribe();
                }
            }
        }));
    }

    private void schedule(final Subscriber<? super T> subscriber, boolean immediately) {
        long delayTime = immediately ? 0 : this.delayTime;
        subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit);
    }

    private Action0 createInnerAction(final Subscriber<? super T> subscriber) {
        return new Action0() {
            @Override
            public void call() {
                innerSubscription = observable.subscribe(new Observer<T>() {
                    @Override
                    public void onCompleted() {
                        schedule(subscriber, false);
                    }

                    @Override
                    public void onError(Throwable e) {
                        // Doesn't work.
                        // subscriber.onError(e);
                        schedule(subscriber, false);
                    }

                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                });
            }
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

krp*_*krp 5

onError和onCompleted都是终止事件,这意味着你的Observable在任何事件发生后都不会发出任何新事件.为了吞下/处理错误案例,请参阅错误操作符 - https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators.此外,为了实现轮询,您可以利用这一点 - http://reactivex.io/documentation/operators/interval.html


Wil*_*ill 4

所以我已经玩这个有一段时间了,我认为按照你的方式是不可能的。调用onErroronCompleted终止流,翻转包装器done内的标志SafeSubscriber,并且没有办法重置它。

我可以看到两个可用的选项 - 我认为都不是特别优雅,但会起作用。

1 - UnsafeSubscribe. 可能不是最好的主意,但它确实有效,因为它不是将 your 包装Subscriber在 a 中SafeSubscriber,而是直接调用它。最好阅读Javadoc,看看这是否适合您。或者,如果您喜欢冒险,可以自己编写,SafeSubscriber可以重置完成标志或类似标志。以您的示例为例,调用如下:

observe.unsafeSubscribe(...)
Run Code Online (Sandbox Code Playgroud)

2 - 实现与此示例类似的内容。我很欣赏它是用 C# 编写的,但它应该是可读的。简而言之 - 您想要创建一个Pair<T, Exception>类,然后不调用onError,而是调用onNext并设置您的对的异常端。您的订阅者必须更加聪明地检查该对的每一侧,并且您可能需要在源Observable和 之间进行一些数据转换Observable<Pair<T, Exception>>,但我不明白为什么它不起作用。

如果有人有的话,我真的很想看看另一种方法。

希望这可以帮助,

将要