Mar*_*inS 4 multithreading android observable rx-java2
我有一个调用Web服务的方法,我认为该方法在IO线程上运行,直到服务停止并且UI冻结为止。
所以我开始了一些简单的测试来检查线程
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'
public void test() {
disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
})
);
}
Run Code Online (Sandbox Code Playgroud)
是否导致以下输出表明尽管进行了订阅和观察,但一切仍在主线程上运行?
Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main
Run Code Online (Sandbox Code Playgroud)
但是如果我将observeOn移到.subscribeWith之前,如下所示...
public void test() {
disposableRx.add(
Observable.just(1, 2)
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("Emitting item on: " + Thread.currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
System.out.println("Processing item on: " + Thread.currentThread().getName());
return integer * 2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("Consuming item on: " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
})
);
}
Run Code Online (Sandbox Code Playgroud)
即使在阅读了许多有关RxJava的博客之后,我正在寻找的输出也是我一直要困惑的。
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main
Run Code Online (Sandbox Code Playgroud)
我已经剥离了原始方法,直到它几乎是Blog帖子中示例方法的副本为止
这意味着这应该在IO线程上运行loadPerson(),并在主线程上运行。没有。
disposableRx.add(
repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onSuccess(@NonNull String response) {
loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
isLoading.setValue(false);
}
@Override
public void onError(@NonNull Throwable e) {
loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
isLoading.setValue(false);
}
@Override
public void onComplete() {
}
})
);
Run Code Online (Sandbox Code Playgroud)
从我的方法中转出线程是否表明它正在主线程上运行?
是什么原因造成的?
该订单中,你把observeOn()和sunbscribeOn(),和其他运营商是非常重要的。
subscribeOn()运算符告诉源Observable在哪个线程上发射和转换项目。
请注意将observeOn()运算符放在哪里,因为它会更改执行工作的线程!在大多数情况下,您可能希望将切换到观察线程的时间延迟到Rx链的最末端。
Observable.just("long", "longer", "longest")
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.map(String::length)
.filter(length -> length == 6)
.subscribe(length -> System.out.println("item length " + length));
Run Code Online (Sandbox Code Playgroud)这里的映射,过滤和消耗是在主线程中执行的
observeOn()之前map()
没有理由在map()运算符上方应用observeOn()运算符。实际上,此代码将导致NetworkOnMainThreadException!我们不想从主线程上的HTTP响应中读取数据?—应该在切换回主线程之前完成。
您也可以使用多个observeOn()来切换线程,例如本示例。
Observable.just("long", "longer", "longest")
.doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(String::toString)
.doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.io())
.map(String::toString)
.subscribeOn(Schedulers.newThread())
.map(String::length)
.subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
Run Code Online (Sandbox Code Playgroud)输出:
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1
Run Code Online (Sandbox Code Playgroud)
请注意,
根据此答案 subscribeOn()并不适用于下游运算符,因此不能保证您的操作将在其他线程上进行。
subscribeOn效果向上游传播,并更接近事件的来源。
至于你的问题,我做了一个测试,这是结果
private void testRxJava2Async() {
io.reactivex.Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());
return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
Run Code Online (Sandbox Code Playgroud)
测试结果 :
callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main
Run Code Online (Sandbox Code Playgroud)
相信我我理解你的困惑,让我一步一步解释。请记住,对于每个订阅都应该遵守这一点。我们都知道所有订阅都应该在工作线程(IO)上执行,而在主线程(UI)上执行观察者。在第一个例子中,订阅被触发,然后你说,对于这个订阅,所有更新都将在主线程上传递,无论在任何转换期间是否有更多订阅。但是,在第二个示例中,这就是您所说的,仅当订阅上的地图转换被转换然后在主线程上运行时才观察。把它想象成编程中的 Pre 和 Post 增量。
希望这是有道理的。RX 框架很棒。但实际上需要现场练习才能正确。你有没有亲眼见过。
为了确保您的更改在主线程上推送和执行,您需要做的是添加一个中间步骤来观察更改。
Integer[] list = {6,3,2,1};
Observable.just(list).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(value = > value * 2)
.observeOn(Schedulers.mainThread())
.subscribeWith(...)
Run Code Online (Sandbox Code Playgroud)
这个想法是subscribeOn只接受开始处理,例如 NetworkRequest 但不保证值将被推送到同一个线程。observeOn说嘿,我可以收到您最初订阅的那些值。因此,为了确保在主线程上转换值,您可以观察另一个普通线程上的变化,执行操作(Map| Flatmap 等),然后观察这些变化,这将保证只有那些值被放置在主线程上。基本上你说的是这个,嘿,在这些线程中执行所有繁重的处理,但是嘿,每当计算完成时,将这些值传递给主线程中的我以继续我的工作,只有这样我才能接受,因此没有工作永远在主线程上完成。
| 归档时间: |
|
| 查看次数: |
2993 次 |
| 最近记录: |