我有以下使用反应堆核心中的通量的反应代码:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
如您所见,我对流程的外部源 (FluxSink.OverflowStrategy.LATEST) 进行了背压处理。但是,我还想为我的进程配置背压到 redis (redisHashReactiveCommands.hmset(key, map)),因为它可能是比我的进程的外部源更大的瓶颈。我希望我需要为 redis 部分创建另一个通量并将其与这个通量链接,但是我如何实现这一点,因为 .flatMap 适用于单个项目而不是项目流?
另外,我也想将相同发出的项目存储到 Kafka 中,但是链接flapMap 似乎不起作用.. 是否有一种简单的方法将所有这些链接到一组函数调用中(外部源 -> 我的过程,我的进程-> redis,我的进程-> kafka)?
我有以下代码:
Flux.create(sink -> ..., FluxSink.OverflowStrategy.LATEST)
...
.onBackpressureLatest()
.subscribe();
Run Code Online (Sandbox Code Playgroud)
FluxSink.OverflowStrategy.LATEST如果我指定vs. ,Flux 行为有什么区别.onBackpressureLatest()?
如果我指定,例如,两者FluxSink.OverflowStrategy.LATEST和.onBackpressureBuffer(),它会做什么?