nto*_*rnl 6 cancellation rx-java
我Observable.create()用来创建一个observable来对调度程序执行一些工作(例如Schedulers.io(),然后返回结果)AndroidSchedulers.mainThread().
val subscription = observable<T> {
try {
// perform action synchronously
it.onNext(action.invoke(context, args))
it.onCompleted()
} catch (t: Exception) {
it.onError(t)
}
}.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
// handle result here
result.set(it)
},
{
// handle error here
errorHandler.handleTaskError(model, this, it)
},
{
// notify completed
model.completeTask(this)
}
)
Run Code Online (Sandbox Code Playgroud)
内部操作action.invoke()是同步的,可能是阻塞IO操作.当用户决定取消它时,我取消订阅observable:subscription.unsubscribe()
但是,I/O操作不会被中断.是否有任何rx-java API来中断操作?
当您致电时yourSubscription.unsubscribe();,Rx将拨打您的取消订阅代码.
这unsuscribe代码将是Subscription你能类add你来subscriber当您创建Observable.
Observable<Object> obs = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// perform unsubscription
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscriber.onNext(/**...*/);
subscriber.onCompleted();
}
Run Code Online (Sandbox Code Playgroud)
因此,在该unsubscribe方法中,如果您有办法,可以中断您的工作.
请注意,当您取消订阅Observable时或取消订阅Observable时,将调用取消订阅方法.(它将自行取消订阅)
编辑:考虑vladimir mironov评论
| 归档时间: |
|
| 查看次数: |
6459 次 |
| 最近记录: |