如何在 RxJava 中处理 dispose 而不会出现 InterruptedException

byy*_*yyk 2 java interrupted-exception rx-java rx-java2

在下面的代码中,当dispose()被调用时,发射器线程被中断(InterruptedException被抛出 sleep 方法)。

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();
Run Code Online (Sandbox Code Playgroud)

从调试会话中我看到中断源FutureTask在处理期间被取消。在那里,将对照运行线程检查dispose()正在调用的线程,如果不匹配,则发射器被中断。由于我使用了计算,所以线程有所不同Scheduler

有什么方法可以使处理不中断此类发射器,或者实际上应该如何处理?我在这种方法中看到的一个问题是,当我有一个可中断操作(此处通过 sleep 模拟)时,我希望在调用 之前正常完成该操作onComplete()

Gus*_*avo 5

请参考2.0的不同之处-错误处理

2.x 的一项重要设计要求是不应该吞掉任何 Throwable 错误。这意味着无法发出错误,因为下游的生命周期已经达到其最终状态,或者下游取消了即将发出错误的序列。

因此,您可以将所有内容包装在 try/catch 中并正确发出错误:

Observable<Integer> obs = Observable.create(emitter -> {
   try {
      // ...
   } catch (InterruptedException ex) {
      // check if the interrupt is due to cancellation
      // if so, no need to signal the InterruptedException
      if (!disposable.isDisposed()) {
         observer.onError(ex);
      }
   }
});
Run Code Online (Sandbox Code Playgroud)

或者设置一个全局错误消费者来忽略它:

RxJavaPlugins.setErrorHandler(e -> {
    // ..
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    // ...
    Log.warning("Undeliverable exception received, not sure what to do", e);
});
Run Code Online (Sandbox Code Playgroud)