akh*_*khy 2 reactive-programming rx-java
我有一大串需要针对远程 API 进行检查的字符串。
Observable.from(List<String> strings) // let's say the `strings` has > 5000 items
.buffer(50) // splitting the strings into 50-sized chunks, it returns Observable<List<String>> (fast)
.flatMap((strings) -> {
// checkPhoneNumbers is a network call using Retrofit's RxJava (slow)
return mSyncApi.checkPhoneNumbers(strings);
})
.reduce( ... ) // aggregate all checking results
.subscribe( ... );
Run Code Online (Sandbox Code Playgroud)
问题buffer()似乎是发射List<String>太快,以至于.checkPhoneNumbers()几乎同时执行所有多个。我想实现的是排队.checkPhoneNumbers()以更好地支持连接速度较慢的设备。
在List<String>预定义的时间间隔内限制发射是没有意义的,因为这对于具有闪电般快速连接的设备来说是一个缺点。我在 之后立即尝试了 RxJava serialize(),flatMap()但它似乎没有任何区别(虽然我不知道它是否正确使用serialize)。
任何替代方法表示赞赏!谢谢。
小智 5
正如@zsxwing 建议的那样,我认为maxConcurrent如果您试图限制flatMap.
例如:https : //gist.github.com/benjchristensen/a0350776a595fd6e3810#file-parallelexecution-java-L78
private static void flatMapBufferedExampleAsync() {
final AtomicInteger total = new AtomicInteger();
Observable.range(0, 500000000)
.doOnNext(i -> total.incrementAndGet())
.buffer(100)
.doOnNext(i -> System.out.println("emit " + i))
.flatMap(i -> {
return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(10);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}, 2 /* limit concurrency to 2 */) // <--- note argument here
.toBlocking().forEach(System.out::println);
System.out.println("total emitted: " + total.get());
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2503 次 |
| 最近记录: |