如何在RxJava2中并行执行使用者?

Cor*_*ral 3 java multithreading rx-java rx-java2

我有一个关于RxJava2的问题.我想在固定线程池的不同线程中运行使用者以并行执行List结果.这是我的代码:

    List<String> letters = Lists.newArrayList("a","b","c","d","e","f","g");
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(letters.size());
    Observable.fromIterable(letters).observeOn(Schedulers.from(fixedThreadPool)).forEach(new Consumer<String>() {
        @Override
        public void accept(String data) throws Exception {
            System.out.println(data + " forEach, thread is " + Thread.currentThread().getName());
        }
    });
Run Code Online (Sandbox Code Playgroud)

我得到的结果是:

a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-1
c forEach, thread is pool-1-thread-1
d forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-1
f forEach, thread is pool-1-thread-1
g forEach, thread is pool-1-thread-1
Run Code Online (Sandbox Code Playgroud)

但实际上我想要的是这个结果,每个消费者并行执行不同的线程:

a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-2
c forEach, thread is pool-1-thread-3
d forEach, thread is pool-1-thread-4
e forEach, thread is pool-1-thread-5
f forEach, thread is pool-1-thread-6
g forEach, thread is pool-1-thread-7
Run Code Online (Sandbox Code Playgroud)

有人能告诉我如何实现它吗?

Zap*_*dot 5

为了读取并行线程中的项目,使用Flowable <>而不是Observable,因为它提供并行运算符.例如:

 Flowable.fromIterable(letters)
         .parallel(letters.size())
         .runOn(Schedulers.from(fixedThreadPool))
         .sequential()
         .forEach(data -> System.out.println(data + " forEach, thread is " + 
                          Thread.currentThread().getName()));
Run Code Online (Sandbox Code Playgroud)

由于无法预测每个调用将使用哪个线程,因此输出可能会有所不同.在我的测试用例中,我得到了

c forEach, thread is pool-1-thread-3
g forEach, thread is pool-1-thread-7
a forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-5
d forEach, thread is pool-1-thread-4
b forEach, thread is pool-1-thread-2
f forEach, thread is pool-1-thread-6
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅RxJava Wiki的并行流部分

  • 不要使用`parallel()`来执行`.toFlowable(BackpressureStrategy.MISSING)`!另外,如果你有一个可迭代的,为什么不使用`Flowable.fromIterable`并且你获得免费的背压? (2认同)