Tho*_*Lee 5 java reactive-programming project-reactor
我有以下使用反应堆核心中的通量的反应代码:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
如您所见,我对流程的外部源 (FluxSink.OverflowStrategy.LATEST) 进行了背压处理。但是,我还想为我的进程配置背压到 redis (redisHashReactiveCommands.hmset(key, map)),因为它可能是比我的进程的外部源更大的瓶颈。我希望我需要为 redis 部分创建另一个通量并将其与这个通量链接,但是我如何实现这一点,因为 .flatMap 适用于单个项目而不是项目流?
另外,我也想将相同发出的项目存储到 Kafka 中,但是链接flapMap 似乎不起作用.. 是否有一种简单的方法将所有这些链接到一组函数调用中(外部源 -> 我的过程,我的进程-> redis,我的进程-> kafka)?
如果您对主序列中的结果对象不感兴趣,您可以将flatMap. 您还必须移动 subscribeOn 并登录到 flatMap 中,并将它们放在内部保存发布者上:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> Mono.when(
redisHashReactiveCommands.hmset(key, map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),
kafkaReactiveCommand.something(map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
))
//... this results in a Mono<Void>
.doOnComplete(() -> log.debug("Both redis and kafka completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
或者,如果您确定两个进程都会发出结果元素或错误,则可以Tuple2通过替换为 来when将这两个结果合并到 a 中zip。
| 归档时间: |
|
| 查看次数: |
5375 次 |
| 最近记录: |