Android中的RxJava异步任务

mun*_*ikh 14 android android-asynctask rx-java

我试图在Android中使用RxJava实现异步任务.我尝试了以下代码,但它没有用.它在UI线程上执行.我使用的是以下版本的RxAndroid 0.24.0.

try {
            Observable.just(someMethodWhichThrowsException())
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(s -> onMergeComplete());
        } catch (IOException e) {
            e.printStackTrace();
        }
Run Code Online (Sandbox Code Playgroud)

但是,以下对我来说是异步的.

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    someMethodWhichThrowsException();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                subscriber.onCompleted();
            }
        });
        observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();
Run Code Online (Sandbox Code Playgroud)

我想了解以下内容:1.它们之间有什么区别?2.创建异步任务时的最佳做法是什么?

提前致谢.

Ada*_*m S 20

  1. 他们之间有什么区别?
Observable.just(someMethodWhichThrowsException())
    .subscribeOn(Schedulers.newThread())
Run Code Online (Sandbox Code Playgroud)

这相当于以下内容:

Object someResult = someMethodWhichThrowsException();
Observable.just(someResult)
    .subscribeOn(Schedulers.newThread())
Run Code Online (Sandbox Code Playgroud)

如您所见,这使得同步方法首先调用,然后将其传递Observable.just给Observable.

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            ...
        }
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

但是,此方法将在订阅时在call块中运行代码.你已经告诉它你要订阅一个新线程(),所以订阅发生在一个新线程上,并且在订阅(块)上运行的代码也会在该线程上运行.这与调用类似.subscribeOn(Schedulers.newThread())callObservable.defer

  1. 创建异步任务时的最佳做法是什么?

嗯,这取决于你和你想要的行为.有时您希望异步代码立即开始运行(在这种情况下,您可能希望使用其中一个运算符来缓存它).我肯定会考虑使用Async Utils库.

其他时候,你会希望它只在订阅时运行(这是示例中的行为) - 例如,如果有副作用,或者如果你不关心它何时运行并且只想使用构建-ins从UI线程中获取一些东西.丹卢提到Observable.defer是一个转换的Rx期间采取的旧代码,并得到它关闭UI线程,非常方便.


cle*_*p6r 11

使用RxJava Async Utils库中的Async.start().这将调用您在另一个线程上提供的函数.

例:

Observable<String> observable = Async.start(new Func0<String>() {
    @Override
    public String call() {
        try {
            return someMethodWhichThrowsException();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

如您所知,已检查的异常必须包装到RuntimeExceptions中.

另请参见https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start

  • 关于`Async.start`的一个注释,它会立即开始工作,甚至在订阅之前,这有时正是你想要的.如果你想等到订阅,请使用`Async.toAction(...).call()`或`Observable.defer`.在这种情况下,`defer`将处理异常清理器,因为你可以`返回Observable.error(new MyException());`而不是把它扔在那里,尽管抛出的异常会产生相同的结果. (3认同)