Spring Data R2DBC - 未考虑背压?

cam*_*err 6 spring-data project-reactor spring-data-r2dbc r2dbc

该线程是 Github 问题的延续: https://github.com/spring-projects/spring-data-r2dbc/issues/194

\n

语境:

\n

你好,

\n

我刚刚尝试了一个非常简单的示例,基于两个反应性存储库:

\n

给定 br一个 r2dbc crud 存储库,以及cr另一个 r2dbc crud 存储库:

\n
br.findAll()\n   .flatMap(br -> {\n      return cr.findById(br.getPropertyOne())\n               .doOnNext(c -> br.setProperty2(c))\n               .thenReturn(br);\n    })\n   .collectList().block();\n
Run Code Online (Sandbox Code Playgroud)\n

此代码示例永远不会完成(只有前 250 个左右的条目到达.collectList操作员)。经过一番挖掘后,onBackpressureXXX在后面添加了一些运算符findAll可以通过......删除元素或缓冲它们来“修复”问题。

\n

在这一点上,我的理解是,r2dbc 反应存储库不使用消费者反馈机制,这消除了 r2dbc 的大部分好处。

\n

我错了吗 ?有没有更好的方法来实现同样的目标?

\n

谢谢 !

\n
\n

@mp911de 的建议:

\n

作为一般规则,避免在另一个流处于活动状态时创建流(名言:不要交叉流)。

\n

如果您想获取相关数据,那么理想情况下将所有结果收集为列表,然后运行子查询。这样,初始响应流就会被消耗,并且连接可以自由地获取其他结果。

\n

像下面的代码片段这样的东西应该可以完成这项工作:

\n
br.findAll().collectList()\n        .flatMap(it -> {\n\n            List<Mono<Reference>> refs = new ArrayList<>();\n            for (Person p : it) {\n                 refs.add(cr.findById(br.getPropertyOne()).doOnNext(\xe2\x80\xa6));\n            }\n\n            return Flux.concat(refs).thenReturn(it);\n        });\n
Run Code Online (Sandbox Code Playgroud)\n
\n

但这消除了流式传输数据而不将其全部保留在内存中的好处(我的最后一步不是列出而是流式写入以输出到某个文件)。

\n

对这个有什么帮助吗?

\n

Cof*_*Pro 0

我们遇到了同样的问题,github 线程引导我走向正确的道路。除了寻找以下两件事之外,没有解决此问题的灵丹妙药:

  1. 在事务内运行的任何流
  2. 在同一范围内运行的多个流

尤其是最后一点,不容易找到。对于 OP 来说,afindAll()持续提供数据(流 1),aflatMap在读取每个发出的结果时同时修改数据(流 2)。

就我而言,情况有所不同。路由原因是执行了 a flatMapinside a flatMapPublisher当您传递一个参数时,可能很难找到它。假设您有以下代码:

@Transactional
public Mono<Render> saveInPostgres(final Long sequenceNumber,
                                   final Mono<SomeComputeResult> previousOperations) {  
        previousOperations.flatMap(computeResult -> {
            computeResult.updateSequenceNumber(sequenceNumber);
            Render render = computeResult.finalize();
            final Mono<Render> saved = renderRepository.save(entity);
            ...
        }
Run Code Online (Sandbox Code Playgroud)

我传递了一个Mono,然后通过 执行flatMap。更新序列号后,该方法将对象保存在存储库中。当然,导致 的所有操作Mono现在都会在您映射时执行。你在这里看不到的是:

  1. saveInPostgres本身被称为内部flatMap
  2. previousOperations.flatMap()运行时出错,即异常

这种行为对我们来说尤其痛苦,因为异常发生的时候,数据库上有锁,并且锁不断累积并耗尽我们的连接池,直到服务无响应。

解决问题的提示

所以我的 2 美分,如果你有Publishers 作为参数,删除它们。始终链接映射执行,而不是传递它们并可能在流中创建流。这与OP问题本质上是一样的。通过调用,collectList()他能够链接操作,因为findAll()它是在通过第二个对元素进行任何其他修改之前完成的flatMap