cor*_*vax -2 java multithreading thread-safety rx-java
声明PublishSubject在RxJava中不是线程安全的.好.
我试图找到任何一个例子,我试图构建任何一个例子来模拟竞争条件,这会导致不必要的结果.但我不能:(
任何人都可以提供一个证明PublishSubject不是线程安全的例子吗?
通常,人们会问为什么他们的设置会出现意外和/或崩溃,答案是:因为他们同时调用onXXX方法Subject
:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import rx.Scheduler.Worker;
import rx.exceptions.MissingBackpressureException;
import rx.observers.AssertableSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.*;
public class PublishSubjectRaceTest {
@Test
public void racy() throws Exception {
Worker worker = Schedulers.computation().createWorker();
try {
for (int i = 0; i < 1000; i++) {
AtomicInteger wip = new AtomicInteger(2);
PublishSubject<Integer> ps = PublishSubject.create();
AssertableSubscriber<Integer> as = ps.test(1);
CountDownLatch cdl = new CountDownLatch(1);
worker.schedule(() -> {
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0) ;
}
ps.onNext(1);
cdl.countDown();
});
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0) ;
}
ps.onNext(1);
cdl.await();
as.assertFailure(MissingBackpressureException.class, 1);
}
} finally {
worker.unsubscribe();
}
}
@Test
public void nonRacy() throws Exception {
Worker worker = Schedulers.computation().createWorker();
try {
for (int i = 0; i < 1000; i++) {
AtomicInteger wip = new AtomicInteger(2);
Subject<Integer, Integer> ps = PublishSubject.<Integer>create()
.toSerialized();
AssertableSubscriber<Integer> as = ps.test(1);
CountDownLatch cdl = new CountDownLatch(1);
worker.schedule(() -> {
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0) ;
}
ps.onNext(1);
cdl.countDown();
});
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0) ;
}
ps.onNext(1);
cdl.await();
as.assertFailure(MissingBackpressureException.class, 1);
}
} finally {
worker.unsubscribe();
}
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
857 次 |
最近记录: |