RxJava2 .subscribeOn .observeOn混乱。在主线程上运行

Mar*_*inS 4 multithreading android observable rx-java2

我有一个调用Web服务的方法,我认为该方法在IO线程上运行,直到服务停止并且UI冻结为止。

所以我开始了一些简单的测试来检查线程

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'


public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })

                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}
Run Code Online (Sandbox Code Playgroud)

是否导致以下输出表明尽管进行了订阅和观察,但一切仍在主线程上运行?

Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main
Run Code Online (Sandbox Code Playgroud)

但是如果我将observeOn移到.subscribeWith之前,如下所示...

public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}
Run Code Online (Sandbox Code Playgroud)

即使在阅读了许多有关RxJava的博客之后,我正在寻找的输出也是我一直要困惑的。

Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main
Run Code Online (Sandbox Code Playgroud)

我已经剥离了原始方法,直到它几乎是Blog帖子中示例方法的副本为止

像老板一样的多线程

这意味着这应该在IO线程上运行loadPerson(),并在主线程上运行。没有。

disposableRx.add(
        repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableMaybeObserver<String>() {

                    @Override
                    public void onSuccess(@NonNull String response) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onComplete() {

                    }
                })
);
Run Code Online (Sandbox Code Playgroud)

从我的方法中转出线程是否表明它正在主线程上运行?

是什么原因造成的?

Bis*_*Abd 7

订单中,你把observeOn()sunbscribeOn(),和其他运营商是非常重要的

  • subscribeOn()运算符告诉源Observable在哪个线程上发射和转换项目。

  • 请注意将observeOn()运算符放在哪里,因为它会更改执行工作的线程!在大多数情况下,您可能希望将切换到观察线程的时间延迟到Rx链的最末端。

    Observable.just("long", "longer", "longest")
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .map(String::length)
        .filter(length -> length == 6)
        .subscribe(length -> System.out.println("item length " + length));
    
    Run Code Online (Sandbox Code Playgroud)

这里的映射,过滤和消耗是在主线程中执行的

  • observeOn()之前map() 没有理由在map()运算符上方应用observeOn()运算符。实际上,此代码将导致NetworkOnMainThreadException!我们不想从主线程上的HTTP响应中读取数据?—应该在切换回主线程之前完成。

  • 您也可以使用多个observeOn()来切换线程,例如本示例。

     Observable.just("long", "longer", "longest")
    .doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.computation())
    .map(String::toString)
    .doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.io())
    .map(String::toString)
    .subscribeOn(Schedulers.newThread())
    .map(String::length)
    .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
    
    Run Code Online (Sandbox Code Playgroud)

输出:

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1
Run Code Online (Sandbox Code Playgroud)

请注意, 根据此答案 subscribeOn()并不适用于下游运算符,因此不能保证您的操作将在其他线程上进行。

subscribeOn 效果向上游传播,并更接近事件的来源。

至于你的问题,我做了一个测试,这是结果

   private void testRxJava2Async() {
 io.reactivex.Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {

            Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());

            return null;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new Observer<String>() {


                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {

                    Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
Run Code Online (Sandbox Code Playgroud)

测试结果 :

callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main
Run Code Online (Sandbox Code Playgroud)


Rem*_*rio 5

相信我我理解你的困惑,让我一步一步解释。请记住,对于每个订阅都应该遵守这一点。我们都知道所有订阅都应该在工作线程(IO)上执行,而在主线程(UI)上执行观察者。在第一个例子中,订阅被触发,然后你说,对于这个订阅,所有更新都将在主线程上传递,无论在任何转换期间是否有更多订阅。但是,在第二个示例中,这就是您所说的,仅当订阅上的地图转换被转换然后在主线程上运行时才观察。把它想象成编程中的 Pre 和 Post 增量。

希望这是有道理的。RX 框架很棒。但实际上需要现场练习才能正确。你有没有亲眼见过。

为了确保您的更改在主线程上推送和执行,您需要做的是添加一个中间步骤来观察更改。

Integer[] list = {6,3,2,1};
 Observable.just(list).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(value = > value  * 2)
.observeOn(Schedulers.mainThread())
.subscribeWith(...)
Run Code Online (Sandbox Code Playgroud)

这个想法是subscribeOn只接受开始处理,例如 NetworkRequest 但不保证值将被推送到同一个线程。observeOn说嘿,我可以收到您最初订阅的那些值。因此,为了确保在主线程上转换值,您可以观察另一个普通线程上的变化,执行操作(Map| Flatmap 等),然后观察这些变化,这将保证只有那些值被放置在主线程上。基本上你说的是这个,嘿,在这些线程中执行所有繁重的处理,但是嘿,每当计算完成时,将这些值传递给主线程中的我以继续我的工作,只有这样我才能接受,因此没有工作永远在主线程上完成。