小编byy*_*yyk的帖子

如何在 RxJava 中处理 dispose 而不会出现 InterruptedException

在下面的代码中,当dispose()被调用时,发射器线程被中断(InterruptedException被抛出 sleep 方法)。

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();
Run Code Online (Sandbox Code Playgroud)

从调试会话中我看到中断源FutureTask在处理期间被取消。在那里,将对照运行线程检查dispose()正在调用的线程,如果不匹配,则发射器被中断。由于我使用了计算,所以线程有所不同Scheduler

有什么方法可以使处理不中断此类发射器,或者实际上应该如何处理?我在这种方法中看到的一个问题是,当我有一个可中断操作(此处通过 sleep 模拟)时,我希望在调用 之前正常完成该操作onComplete() …

java interrupted-exception rx-java rx-java2

2
推荐指数
1
解决办法
3202
查看次数

标签 统计

interrupted-exception ×1

java ×1

rx-java ×1

rx-java2 ×1