在RxJava流中添加线程池

rad*_*226 7 java concurrency multithreading asynchronous rx-java

我想知道是否可以使用RxJava库以便在以下用例中添加一些并发性:

  • 使用自定义(类似)String从现有列中顺序获取列ResultSetObservableResultSetObservable.create(resultSet)
  • 调用Web Service的每个值(与InvokeWebServiceFunc1<String, Pair<String, Integer>>()以获取相关的一些statistiques实例,例如)String(请注意,StringPair相同的一个输入通过)
  • 以CSV格式打印所有内容(带a ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream)).

所以这就是我所拥有的:

ResultSetObservable.create(resultSet)
    .map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
    .subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));
Run Code Online (Sandbox Code Playgroud)

它运作良好,但随着Web服务可能需要一些时间,一些的String投入,我想添加一些并发有一个线程池类似行为的映射(10个线程的例子),但我需要ExportAsCSVAction0,以在被称为相同的线程(实际上当前的线程将是完美的).

你能帮我么?我无法想象在这里使用toBlocking().forEach()模式是否是正确的解决方案,我不明白在哪里使用Schedulers.from(fixedThreadPool)(在observeOn()或中subscribeOn()).

感谢您的任何帮助!

rad*_*226 20

我自己找到了!

package radium.rx;

import java.util.List;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.schedulers.Schedulers;

public class TryRx {

    public static Random RANDOM = new Random();

    public static void main(String[] arguments) throws Throwable {
        final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
        final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);

        Iterable<Integer> outputs = Observable.<Integer>from(inputs)
                .flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(threadPoolExecutor)))
                .toBlocking()
            .toIterable();

        for (Integer output : outputs) {
            System.out.println(output);
        }

        threadPoolExecutor.shutdown();
    }

    public static void sleepQuietly(int duration, TimeUnit unit) {
        try {
            Thread.sleep(unit.toMillis(duration));
        } catch (InterruptedException e) {

        }
    }

    public static Observable<Integer> deferHeavyWeightStuff(final int input) {
        return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input)));
    }

    public static int randomInt(int min, int max) {
        return RANDOM.nextInt((max - min) + 1) + min;
    }

    public static int doHeavyWeightStuff(int input) {
        sleepQuietly(randomInt(1, 5), TimeUnit.SECONDS);
        int output = (int) Math.pow(input, 2);
        return output;
    }

}
Run Code Online (Sandbox Code Playgroud)

干杯!

  • 这在并行化时是可以预料的。 (2认同)