byy*_*yyk 2 java interrupted-exception rx-java rx-java2
在下面的代码中,当dispose()被调用时,发射器线程被中断(InterruptedException被抛出 sleep 方法)。
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
Run Code Online (Sandbox Code Playgroud)
从调试会话中我看到中断源FutureTask在处理期间被取消。在那里,将对照运行线程检查dispose()正在调用的线程,如果不匹配,则发射器被中断。由于我使用了计算,所以线程有所不同Scheduler。
有什么方法可以使处理不中断此类发射器,或者实际上应该如何处理?我在这种方法中看到的一个问题是,当我有一个可中断操作(此处通过 sleep 模拟)时,我希望在调用 之前正常完成该操作onComplete()。
请参考2.0的不同之处-错误处理。
2.x 的一项重要设计要求是不应该吞掉任何 Throwable 错误。这意味着无法发出错误,因为下游的生命周期已经达到其最终状态,或者下游取消了即将发出错误的序列。
因此,您可以将所有内容包装在 try/catch 中并正确发出错误:
Observable<Integer> obs = Observable.create(emitter -> {
try {
// ...
} catch (InterruptedException ex) {
// check if the interrupt is due to cancellation
// if so, no need to signal the InterruptedException
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}
});
Run Code Online (Sandbox Code Playgroud)
或者设置一个全局错误消费者来忽略它:
RxJavaPlugins.setErrorHandler(e -> {
// ..
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
// ...
Log.warning("Undeliverable exception received, not sure what to do", e);
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3202 次 |
| 最近记录: |