小编ukr*_*e10的帖子

如何迭代 Flux 中的对象并对其进行操作?

我正在使用项目反应器,我想执行以下操作:

    @Override
    public void run(ApplicationArguments args) {
        Flux.from(KafkaReceiver.create(receiverOptions)
                        .receive()
                        .map(this::getObject)
                        .flatMap(this::iterateElasticWrites)
                        .flatMap(this::writeTheWholeObjectToS3)
        ).subscribe();
    }

    // What I'd like to do - but a non reactive code
    private Publisher<MyObj> iterateElasticWrites(MyObj message) {
        for (MyDoc file: message.getDocs()) {
            writeElasticDoc(file.getText());
        }
        return Mono.just(message);
    }
Run Code Online (Sandbox Code Playgroud)

我正在努力寻找与iterateElasticWrites反应堆项目中的等效项。我想对我的对象 ( MyObj) 执行迭代,并将其每个文档列表的元素反应性地写入 elasticsearch 中。

reactor project-reactor reactive-streams spring-webflux

4
推荐指数
1
解决办法
1万
查看次数