sdg*_*sdh 2 java rx-java rx-java2
有时我想触发 aRunnable作为我的Observable序列的一部分,但Runnable不报告进度。
我编写了一个简单的工厂,用于将Runnable对象包装成一个Observable:
public static <T> Observable<T> fromRunnable(final Runnable action) {
if (action == null) {
throw new NullPointerException("action");
}
return Observable.fromPublisher(subscriber -> {
try {
action.run();
subscriber.onComplete();
} catch (final Throwable throwable) {
subscriber.onError(throwable);
}
});
}
Run Code Online (Sandbox Code Playgroud)
用法:
Observable.concat(
someTask,
MoreObservables.fromRunnable(() -> {
System.out.println("Done. ");
}));
Run Code Online (Sandbox Code Playgroud)
但是 RxJava 2 已经提供了这个功能吗?
没有这样的工厂方法Observable,但 Completable可以从Runnable. 因此,您可以先创建一个Completable,然后将其转换为Observable:
Observable.concat(
someTask,
Completable.fromRunnable(() -> {
System.out.println("Done");
}).toObservable()
);
Run Code Online (Sandbox Code Playgroud)
更新:处理异常
Completable.fromRunnable内部从它的内部捕获异常Runnable并将它们作为onError排放推入流中。但是,如果您使用的是 Java,则必须自己处理run()方法内部的已检查异常。为了避免这种情况,您可以使用Callable而不是Runnable,因为它的call()方法签名声明它可以抛出异常。Completable.fromCallable()也将异常包装到onError排放中:
Observable.concat(
someTask,
Completable.fromCallable(() -> {
System.out.println("Done");
return null;
}).toObservable()
);
Run Code Online (Sandbox Code Playgroud)
也Callable可用于创建Observable或Single带有单个项目发射。
PS查看源代码,这些方法非常简单。
PPS Kotlin没有检查异常;)
更新 2
还有fromAction用于创建Completable. 它接受Action对象。
类似于 Runnable 的功能接口,但允许抛出已检查的异常。
所以代码可以简化为:
Observable.concat(
someTask,
Completable.fromAction(() -> {
System.out.println("Done");
}).toObservable()
);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3176 次 |
| 最近记录: |