RxJava:如何在取消订阅时中断线程?

nto*_*rnl 6 cancellation rx-java

Observable.create()用来创建一个observable来对调度程序执行一些工作(例如Schedulers.io(),然后返回结果)AndroidSchedulers.mainThread().

val subscription = observable<T> {
        try {
            // perform action synchronously
            it.onNext(action.invoke(context, args))
            it.onCompleted()
        } catch (t: Exception) {
            it.onError(t)
        }
    }.subscribeOn(scheduler)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    {
                        // handle result here
                        result.set(it)
                    },
                    {
                        // handle error here
                        errorHandler.handleTaskError(model, this, it)
                    },
                    {
                        // notify completed
                        model.completeTask(this)
                    }
            )
Run Code Online (Sandbox Code Playgroud)

内部操作action.invoke()是同步的,可能是阻塞IO操作.当用户决定取消它时,我取消订阅observable:subscription.unsubscribe()

但是,I/O操作不会被中断.是否有任何rx-java API来中断操作?

dwu*_*sen 9

当您致电时yourSubscription.unsubscribe();,Rx将拨打您的取消订阅代码.

这unsuscribe代码将是Subscription你能类add你来subscriber当您创建Observable.

Observable<Object> obs = Observable.create(subscriber -> {

      subscriber.add(new Subscription() {
            @Override
            public void unsubscribe() {
                 // perform unsubscription
            }

            @Override
            public boolean isUnsubscribed() {
                return false;
            }
        });
      subscriber.onNext(/**...*/);
      subscriber.onCompleted();
}
Run Code Online (Sandbox Code Playgroud)

因此,在该unsubscribe方法中,如果您有办法,可以中断您的工作.

请注意,当您取消订阅Observable时或取消订阅Observable时,将调用取消订阅方法.(它将自行取消订阅)

编辑:考虑vladimir mironov评论