RXJava 的问题

Fat*_*onk 1 java android rx-java rx-java2 rx-java3

我正在改编来自三词地址的一些示例代码,以便通过 Java SDK 访问他们的 API。它使用 RXJava。

示例代码是:

Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
        });
Run Code Online (Sandbox Code Playgroud)

首先。这会在构建时给出弃用警告以及 IDE 警告 ( Result of 'Observable.subscribe()' is ignored)。

为了解决第一个问题,我Disposable myDisposable = Observable. 它是否正确?(添加位置见下文)

接下来,我需要添加超时,以便在请求超时时可以显示警告等。为此,我已添加.timeout(5000, TimeUnit.MILLISECONDS)到构建器中。

timeout这是可行的,但s 似乎对 s 起作用的方式Observable是它们抛出异常,而我不知道如何捕获和处理该异常。

我现在拥有的是:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(5000, TimeUnit.MILLISECONDS)
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
        });
Run Code Online (Sandbox Code Playgroud)

它构建并运行良好,并且不显示 API/弃用警告,但是当没有可用网络时,它会正确超时并引发未处理的异常。

所以,代码似乎是正确的,但是到底如何添加异常处理来捕获引发的超时呢TimeoutException

我已经尝试了很多事情,包括:try-catch在整个过程中添加一个子句Observable- 这警告说,TimeoutException“try;”中的代码不会抛出该错误。并添加错误处理程序。

添加错误处理程序已经让我最接近了,所以下面的代码是我所得到的:

Disposable myDisposable = Observable.fromCallable(() -> wrapper.convertTo3wa(new Coordinates(51.2423, -0.12423)).execute())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(5000, TimeUnit.MILLISECONDS)
        .subscribe(result -> {
            if (result.isSuccessful()) {
                Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
            } else {
                Log.e("MainActivity", result.getError().getMessage());
            }
         }, error -> {
             runOnUiThread(new Runnable() {
                 @Override
                 public void run() {
                     myTextView.setText(R.string.network_not_available);
                 }
             });
         });
Run Code Online (Sandbox Code Playgroud)

这可以正确捕获超时并更新我的 UI,不会出现错误,但是当网络恢复时,Observable 似乎可能会尝试返回并引发空指针异常。

(更新一下,无论网络是否恢复,这个NPE实际上有时会在短时间内抛出......但当网络恢复时它总是会抛出。)

我得到FATAL EXCEPTION: RxCachedThreadScheduler-1并且java.lang.NullPointerException: Callable returned a null value. Null values are generally not allowed in 3.x operators and sources.

我是否需要销毁Observable或其他东西来防止 NPE?

dan*_*ano 5

您需要onError在调用中添加一个处理程序subscribe

    .subscribe(result -> {
        if (result.isSuccessful()) {
            Log.i("MainActivity", String.format("3 word address: %s", result.getWords()));
        } else {
            Log.e("MainActivity", result.getError().getMessage());
        }
     },
     error -> {
         // handle error here
     });
Run Code Online (Sandbox Code Playgroud)

当异常发生到没有 onError 处理程序的订阅调用时,它将抛出 OnErrorNotImplementedException,如下所示:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
Run Code Online (Sandbox Code Playgroud)

添加 onError 处理程序将阻止这种情况发生,并且 onError 处理程序将被调用。