可观察的发射器为HystrixCommands产生太大的压力

Ben*_*ith 5 observable java-8 rx-java hystrix

我有一个Observable发出文件行(从GCS读取许多GB).

return StringObservable.byLine(
    Observable.using(
        () -> storage.get(blobId).reader(),
            reader -> Observable.create(
                    new OnSubscribeReadChannel(reader, 64 * 1024)
                ),
            ReadChannel::close
    )
)
Run Code Online (Sandbox Code Playgroud)

每行产生多个(在某些情况下很多)调用各种DB,所有这些都包含在Hystrix命令中.显然这些线最终压倒了Hystrix命令,电路开始打开,每个人都有糟糕的一天.

这大致就是我正在做的事情:

readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
            .map(this::deserializeLine)
            .flatMap(this::addDataToObjectFromSomeDb)
            .flatMap(this::writeObj)
            .map(Set::size)
            .reduce(0, (a, b) -> a + b)
            .toBlocking().single()
Run Code Online (Sandbox Code Playgroud)

有没有办法可以应用一些背压,或限制一次处理的线数?

Lit*_*log 0

你需要使用Emitter.BackpressureMode.BUFFER

BUFFER
Buffers (unbounded) all onNext calls until the downstream can consume them.
Run Code Online (Sandbox Code Playgroud)

http://reactivex.io/RxJava/1.x/javadoc/index.html?rx/Emitter.BackPressureMode.html