证明RxJava中的PublishSubject不是线程安全的

cor*_*vax -2 java multithreading thread-safety rx-java

声明PublishSubject在RxJava中不是线程安全的.好.

我试图找到任何一个例子,我试图构建任何一个例子来模拟竞争条件,这会导致不必要的结果.但我不能:(

任何人都可以提供一个证明PublishSubject不是线程安全的例子吗?

aka*_*okd 5

通常,人们会问为什么他们的设置会出现意外和/或崩溃,答案是:因为他们同时调用onXXX方法Subject:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Scheduler.Worker;
import rx.exceptions.MissingBackpressureException;
import rx.observers.AssertableSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.*;

public class PublishSubjectRaceTest {

    @Test
    public void racy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                PublishSubject<Integer> ps = PublishSubject.create();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }

    @Test
    public void nonRacy() throws Exception {
        Worker worker = Schedulers.computation().createWorker();
        try {
            for (int i = 0; i < 1000; i++) {
                AtomicInteger wip = new AtomicInteger(2);

                Subject<Integer, Integer> ps = PublishSubject.<Integer>create()
                    .toSerialized();

                AssertableSubscriber<Integer> as = ps.test(1);

                CountDownLatch cdl = new CountDownLatch(1);

                worker.schedule(() -> {
                    if (wip.decrementAndGet() != 0) {
                        while (wip.get() != 0) ;
                    }
                    ps.onNext(1);

                    cdl.countDown();
                });
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0) ;
                }
                ps.onNext(1);

                cdl.await();

                as.assertFailure(MissingBackpressureException.class, 1);
            }
        } finally {
            worker.unsubscribe();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)