使用 RXJava 执行异步任务的推荐方法

arr*_*dev 6 java asynchronous rx-java

我是 RxJava 的新手,我试图了解异步执行长时间运行的任务(例如网络请求)的最佳/推荐方法。我在网上阅读了很多例子,但希望得到一些反馈。

以下代码有效(它打印“一”、“二”,然后是“用户:x”……等等)但我真的应该手动创建/管理线程吗?

提前致谢!

public void start() throws Exception {
    System.out.println("one");
    observeUsers()
        .flatMap(users -> Observable.from(users))
        .subscribe(user -> System.out.println(String.format("User: %s", user.toString()));
    System.out.println("two");
}

Observable<List<User>> observeUsers() {
    return Observable.<List<User>>create(s -> {
        Thread thread = new Thread(() -> getUsers(s));
        thread.start();
    });
}

void getUsers(final Subscriber s) {
    s.onNext(userService.getUsers());
    s.onCompleted();
}

// userService.getUsers() fetches users from a web service.
Run Code Online (Sandbox Code Playgroud)

kjo*_*nes 4

尝试使用运算符,而不是管理自己的线程defer()。意思是替换observeUsers()Observable.defer(() -> Observable.just(userService.getUsers())). 然后,您可以使用RxJava Schedulers来控制订阅和观察期间使用哪些线程。这是根据上述建议修改的代码。

Observable.defer(() -> Observable.just(userService.getUsers()))
        .flatMap(users -> Observable.from(users))
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.trampoline())
        .subscribe(user -> System.out.println(String.format("User: %s", user.toString()));
Run Code Online (Sandbox Code Playgroud)