我正在使用项目反应器,我想执行以下操作:
@Override
public void run(ApplicationArguments args) {
Flux.from(KafkaReceiver.create(receiverOptions)
.receive()
.map(this::getObject)
.flatMap(this::iterateElasticWrites)
.flatMap(this::writeTheWholeObjectToS3)
).subscribe();
}
// What I'd like to do - but a non reactive code
private Publisher<MyObj> iterateElasticWrites(MyObj message) {
for (MyDoc file: message.getDocs()) {
writeElasticDoc(file.getText());
}
return Mono.just(message);
}
Run Code Online (Sandbox Code Playgroud)
我正在努力寻找与iterateElasticWrites反应堆项目中的等效项。我想对我的对象 ( MyObj) 执行迭代,并将其每个文档列表的元素反应性地写入 elasticsearch 中。