rad*_*226 7 java concurrency multithreading asynchronous rx-java
我想知道是否可以使用RxJava库以便在以下用例中添加一些并发性:
String
从现有列中顺序获取列ResultSet
Observable
ResultSetObservable.create(resultSet)
InvokeWebServiceFunc1<String, Pair<String, Integer>>()
以获取相关的一些statistiques实例,例如)String
(请注意,String
在Pair
相同的一个输入通过)ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream)
).所以这就是我所拥有的:
ResultSetObservable.create(resultSet)
.map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
.subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));
Run Code Online (Sandbox Code Playgroud)
它运作良好,但随着Web服务可能需要一些时间,一些的String
投入,我想添加一些并发有一个线程池类似行为的映射(10个线程的例子),但我需要的ExportAsCSVAction0
,以在被称为相同的线程(实际上当前的线程将是完美的).
你能帮我么?我无法想象在这里使用toBlocking().forEach()
模式是否是正确的解决方案,我不明白在哪里使用Schedulers.from(fixedThreadPool)
(在observeOn()
或中subscribeOn()
).
感谢您的任何帮助!
rad*_*226 20
我自己找到了!
package radium.rx;
import java.util.List;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.schedulers.Schedulers;
public class TryRx {
public static Random RANDOM = new Random();
public static void main(String[] arguments) throws Throwable {
final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
Iterable<Integer> outputs = Observable.<Integer>from(inputs)
.flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(threadPoolExecutor)))
.toBlocking()
.toIterable();
for (Integer output : outputs) {
System.out.println(output);
}
threadPoolExecutor.shutdown();
}
public static void sleepQuietly(int duration, TimeUnit unit) {
try {
Thread.sleep(unit.toMillis(duration));
} catch (InterruptedException e) {
}
}
public static Observable<Integer> deferHeavyWeightStuff(final int input) {
return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input)));
}
public static int randomInt(int min, int max) {
return RANDOM.nextInt((max - min) + 1) + min;
}
public static int doHeavyWeightStuff(int input) {
sleepQuietly(randomInt(1, 5), TimeUnit.SECONDS);
int output = (int) Math.pow(input, 2);
return output;
}
}
Run Code Online (Sandbox Code Playgroud)
干杯!