如何从 Runnable 创建 Observable?

sdg*_*sdh 2 java rx-java rx-java2

有时我想触发 aRunnable作为我的Observable序列的一部分,但Runnable不报告进度。

我编写了一个简单的工厂,用于将Runnable对象包装成一个Observable

public static <T> Observable<T> fromRunnable(final Runnable action) {
    if (action == null) {
        throw new NullPointerException("action");
    }
    return Observable.fromPublisher(subscriber -> {
        try {
            action.run();
            subscriber.onComplete();
        } catch (final Throwable throwable) {
            subscriber.onError(throwable);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

用法:

Observable.concat(
    someTask, 
    MoreObservables.fromRunnable(() -> {
        System.out.println("Done. ");
    }));
Run Code Online (Sandbox Code Playgroud)

但是 RxJava 2 已经提供了这个功能吗?

Mak*_*dov 5

没有这样的工厂方法Observable Completable可以从Runnable. 因此,您可以先创建一个Completable,然后将其转换为Observable

Observable.concat(
    someTask, 
    Completable.fromRunnable(() -> {
        System.out.println("Done");
    }).toObservable()
);
Run Code Online (Sandbox Code Playgroud)

更新:处理异常

Completable.fromRunnable内部从它的内部捕获异常Runnable并将它们作为onError排放推入流中。但是,如果您使用的是 Java,则必须自己处理run()方法内部的已检查异常。为了避免这种情况,您可以使用Callable而不是Runnable,因为它的call()方法签名声明它可以抛出异常。Completable.fromCallable()也将异常包装到onError排放中:

Observable.concat(
    someTask, 
    Completable.fromCallable(() -> {
        System.out.println("Done");
        return null;
    }).toObservable()
);
Run Code Online (Sandbox Code Playgroud)

Callable可用于创建ObservableSingle带有单个项目发射。

PS查看源代码,这些方法非常简单。

PPS Kotlin没有检查异常;)


更新 2

还有fromAction用于创建Completable. 它接受Action对象。

类似于 Runnable 的功能接口,但允许抛出已检查的异常。

所以代码可以简化为:

Observable.concat(
    someTask, 
    Completable.fromAction(() -> {
        System.out.println("Done");
    }).toObservable()
);
Run Code Online (Sandbox Code Playgroud)