bak*_*kua 3 multithreading android scheduler subject rx-java
我有一个以下课程,我作为一个单身人士持有:
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
Run Code Online (Sandbox Code Playgroud)
在活动中,我观察会话并对每次更改执行网络操作:
private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}
Run Code Online (Sandbox Code Playgroud)
当我订阅会话存储时,主题io()立即发出,因为它是a BehaviourSubject并且订阅者执行mainThread().
当我sessionStore.set(new AnotherSession())在已经订阅它的时候打电话的时候会出现问题.IMO这应该执行init()在io()调度程序中定义的流.然而,发生的是流在subject.onNext()被调用的同一线程上执行.导致成NetworkOnMainThreadException,因为我在做一个网络操作flatMap().
我是否理解主题错了?我这样滥用他们吗?那么适当的解决方案是什么?
我也尝试用Observable.fromEmitter()in observe()方法替换整个主题方法,但令人惊讶的是输出结果非常相似.
请看看下面的部分从书" 响应式编程方面RxJava "
默认情况下,在Subject上调用onNext()会直接传播到所有Observer的onNext()回调方法.这些方法具有相同的名称并不奇怪.在某种程度上,在Subject上调用onNext()会在每个Subscriber上间接调用onNext().
让我们回顾一下:如果从Thread-1调用Subject上的onNext,它将从Thread-1调用onNext到订阅者.onSubscribe将被删除.
首先要做的事情是:订阅发生在哪个线程上:
retrofitService.getAThing()
Run Code Online (Sandbox Code Playgroud)
我会猜测,并说它是调用线程.这将是observeOn中描述的线程,它是Android-UI-Loop.
observeOn下的每个值都将从调度程序指定的Thread-a转移到Thread-b.UI-Loop上的observeOn应该在订阅之前发生.订阅中将接收的每个值都将位于UI-Loop上,这不会阻止UI线程或以异常结束.
Pease看一下示例代码和输出:
class SessionStore {
private Subject<String, String> subject;
public SessionStore() {
subject = BehaviorSubject.create("wurst").toSerialized();
}
public void set(String session) {
subject.onNext(session);
}
public Observable<String> observe() {
return subject
.asObservable()
.doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
.distinctUntilChanged();
}
}
@Test
public void name() throws Exception {
// init
SessionStore sessionStore = new SessionStore();
TestSubscriber testSubscriber = new TestSubscriber();
Subscription subscribe = sessionStore
.observe()
.flatMap(s -> {
return Observable.fromCallable(() -> {
System.out.println("flatMap Thread:: " + Thread.currentThread());
return s;
}).subscribeOn(Schedulers.io());
})
.doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
.observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
.subscribe(testSubscriber); // Do UI-Stuff in subscribe
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("123");
}).start();
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("345");
}).start();
boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
Assert.assertTrue(b);
}
Run Code Online (Sandbox Code Playgroud)
输出::
Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
set on Thread:: Thread[Thread-1,5,main]
set on Thread:: Thread[Thread-0,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1353 次 |
| 最近记录: |