cam*_*err 6 spring-data project-reactor spring-data-r2dbc r2dbc
该线程是 Github 问题的延续: https://github.com/spring-projects/spring-data-r2dbc/issues/194
\n语境:
\n你好,
\n我刚刚尝试了一个非常简单的示例,基于两个反应性存储库:
\n给定 br一个 r2dbc crud 存储库,以及cr另一个 r2dbc crud 存储库:
br.findAll()\n .flatMap(br -> {\n return cr.findById(br.getPropertyOne())\n .doOnNext(c -> br.setProperty2(c))\n .thenReturn(br);\n })\n .collectList().block();\nRun Code Online (Sandbox Code Playgroud)\n此代码示例永远不会完成(只有前 250 个左右的条目到达.collectList操作员)。经过一番挖掘后,onBackpressureXXX在后面添加了一些运算符findAll可以通过......删除元素或缓冲它们来“修复”问题。
在这一点上,我的理解是,r2dbc 反应存储库不使用消费者反馈机制,这消除了 r2dbc 的大部分好处。
\n我错了吗 ?有没有更好的方法来实现同样的目标?
\n谢谢 !
\n@mp911de 的建议:
\n作为一般规则,避免在另一个流处于活动状态时创建流(名言:不要交叉流)。
\n如果您想获取相关数据,那么理想情况下将所有结果收集为列表,然后运行子查询。这样,初始响应流就会被消耗,并且连接可以自由地获取其他结果。
\n像下面的代码片段这样的东西应该可以完成这项工作:
\nbr.findAll().collectList()\n .flatMap(it -> {\n\n List<Mono<Reference>> refs = new ArrayList<>();\n for (Person p : it) {\n refs.add(cr.findById(br.getPropertyOne()).doOnNext(\xe2\x80\xa6));\n }\n\n return Flux.concat(refs).thenReturn(it);\n });\nRun Code Online (Sandbox Code Playgroud)\n但这消除了流式传输数据而不将其全部保留在内存中的好处(我的最后一步不是列出而是流式写入以输出到某个文件)。
\n对这个有什么帮助吗?
\n我们遇到了同样的问题,github 线程引导我走向正确的道路。除了寻找以下两件事之外,没有解决此问题的灵丹妙药:
尤其是最后一点,不容易找到。对于 OP 来说,afindAll()持续提供数据(流 1),aflatMap在读取每个发出的结果时同时修改数据(流 2)。
就我而言,情况有所不同。路由原因是执行了 a flatMapinside a flatMap。Publisher当您传递一个参数时,可能很难找到它。假设您有以下代码:
@Transactional
public Mono<Render> saveInPostgres(final Long sequenceNumber,
final Mono<SomeComputeResult> previousOperations) {
previousOperations.flatMap(computeResult -> {
computeResult.updateSequenceNumber(sequenceNumber);
Render render = computeResult.finalize();
final Mono<Render> saved = renderRepository.save(entity);
...
}
Run Code Online (Sandbox Code Playgroud)
我传递了一个Mono,然后通过 执行flatMap。更新序列号后,该方法将对象保存在存储库中。当然,导致 的所有操作Mono现在都会在您映射时执行。你在这里看不到的是:
saveInPostgres本身被称为内部flatMappreviousOperations.flatMap()运行时出错,即异常这种行为对我们来说尤其痛苦,因为异常发生的时候,数据库上有锁,并且锁不断累积并耗尽我们的连接池,直到服务无响应。
解决问题的提示
所以我的 2 美分,如果你有Publishers 作为参数,删除它们。始终链接映射执行,而不是传递它们并可能在流中创建流。这与OP问题本质上是一样的。通过调用,collectList()他能够链接操作,因为findAll()它是在通过第二个对元素进行任何其他修改之前完成的flatMap。
| 归档时间: |
|
| 查看次数: |
858 次 |
| 最近记录: |