ben*_*nze 3 java multithreading rx-java reactivex
在用RxJava编写数据同步作业时,我发现了一个我无法解释的奇怪行为.我是RxJava的新手,非常感谢帮助.
简而言之,我的工作非常简单我有一个元素ID列表,我调用web服务来获取每个元素的ID,进行一些处理并进行多次调用以将数据推送到DB.数据加载比数据存储更快,因此我输入了OutOfMemory错误.
我的代码几乎看起来像"失败"测试,但后来做了一些测试我意识到删除线:
flatMap(dt -> Observable.just(dt))
Run Code Online (Sandbox Code Playgroud)
让它起作用.失败的测试输出清楚地表明未消耗的项目叠加,这导致OutOfMemory.工作测试输出显示生产者将始终等待消费者,因此这永远不会导致OutOfMemory.
public static class DataStore {
public Integer myVal;
public byte[] myBigData;
public DataStore(Integer myVal) {
this.myVal = myVal;
this.myBigData = new byte[1000000];
}
}
@Test
public void working() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
@Test
public void failing() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(dt -> Observable.just(dt))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
private Observable<DataStore> produce(final int value) {
return Observable.<DataStore>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(200); //Here I synchronous call WS to retrieve data
s.onNext(new DataStore(value));
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}
private Observable<Boolean> consume(DataStore value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(1000); //Here I synchronous call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
Run Code Online (Sandbox Code Playgroud)
这种行为背后的解释是什么?如何在不删除Observable.just(dt)的情况下解决我的失败测试,在我的实例中是Observable.from(someListOfItme)
flatMap
默认情况下,合并无限量的源并通过应用没有maxConcurrent参数的特定lambda,您基本上可以解除上行链接,现在可以全速运行,从而压倒其他运算符的内部缓冲区.
归档时间: |
|
查看次数: |
596 次 |
最近记录: |