Ben*_*ith 5 observable java-8 rx-java hystrix
我有一个Observable发出文件行(从GCS读取许多GB).
return StringObservable.byLine(
Observable.using(
() -> storage.get(blobId).reader(),
reader -> Observable.create(
new OnSubscribeReadChannel(reader, 64 * 1024)
),
ReadChannel::close
)
)
Run Code Online (Sandbox Code Playgroud)
每行产生多个(在某些情况下很多)调用各种DB,所有这些都包含在Hystrix命令中.显然这些线最终压倒了Hystrix命令,电路开始打开,每个人都有糟糕的一天.
这大致就是我正在做的事情:
readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
.map(this::deserializeLine)
.flatMap(this::addDataToObjectFromSomeDb)
.flatMap(this::writeObj)
.map(Set::size)
.reduce(0, (a, b) -> a + b)
.toBlocking().single()
Run Code Online (Sandbox Code Playgroud)
有没有办法可以应用一些背压,或限制一次处理的线数?
你需要使用Emitter.BackpressureMode.BUFFER
BUFFER
Buffers (unbounded) all onNext calls until the downstream can consume them.
Run Code Online (Sandbox Code Playgroud)
http://reactivex.io/RxJava/1.x/javadoc/index.html?rx/Emitter.BackPressureMode.html
| 归档时间: |
|
| 查看次数: |
80 次 |
| 最近记录: |