Lin*_*yen 5 postgresql spring-webflux spring-data-r2dbc r2dbc r2dbc-postgresql
图书馆:
\n问题:\n我尝试使用 R2DBC (PostgreSQL) 批量插入,代码如下:
\n@Override\npublic Flux<Long> test(List<User> users) {\n return Mono.from(connectionFactory.create())\n .flatMapMany(c -> Mono.from(c.beginTransaction())\n .thenMany(Flux.fromIterable(users)\n .map(u -> {\n return Flux.from(c.createStatement("INSERT INTO public.users(name, age, salary) VALUES ($1, $2, $3)").returnGeneratedValues("id")\n .bind(0, u.getName())\n .bind(1, u.getAge())\n .bind(2, u.getSalary()).execute());\n })\n .flatMap(result -> result)\n .map(result -> result.map((row, meta) -> {\n return row.get("id", Long.class);\n }))\n .flatMap(Flux::from)\n .delayUntil(r -> c.commitTransaction())\n .doFinally((st) -> c.close())));\n}\nRun Code Online (Sandbox Code Playgroud)\n该代码将执行语句将用户插入数据库,然后获取生成的用户 ID。如果用户列表小于或等于 255,以上代码将按预期工作。当用户列表大于 255(256~)时,会出现以下异常:
\n[5b38a8c6-2] There was an unexpected error (type=Internal Server Error, status=500).\nCannot exchange messages because the request queue limit is exceeded\nio.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: Cannot exchange messages because the request queue limit is exceeded\n at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)\n Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n |_ checkpoint \xe2\x87\xa2 Handler xwitch.org.helloworld.rest.v2.CRUDController#importUsersBatchByR2DBC() [DispatcherHandler]\n |_ checkpoint \xe2\x87\xa2 springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]\n |_ checkpoint \xe2\x87\xa2 HTTP GET "/api/v2/users/import-users-batch-by-r2dbc" [ExceptionHandlingWebHandler]\nStack trace:\n at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)\n at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)\n at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49)\n at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)\n at reactor.core.publisher.Flux.subscribe(Flux.java:8147)\n at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:425)\n at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)\n at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)\n at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)\n at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)\n at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)\n at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)\n at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)\n at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)\n at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)\n at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)\n at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)\n at reactor.core.publisher.Flux.subscribe(Flux.java:8147)\n at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)\n at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)\n at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:328)\n at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:345)\n at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)\n at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)\n at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:191)\n at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:248)\n at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)\n at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)\n at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)\n at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)\n at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)\n at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)\n at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)\n at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)\n at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)\n at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)\n at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)\n at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)\n at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)\n at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)\n at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)\n at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)\n at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)\n at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)\n at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)\n at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)\n at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)\n at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)\n at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)\n at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)\n at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)\n at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)\n at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)\n at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n at java.lang.Thread.run(Thread.java:748)\nRun Code Online (Sandbox Code Playgroud)\n当我尝试调查以查明发生了什么时。我看到异常是由 ReactorNettyClient.java 引发的。其实现是:
\npublic Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,\n Supplier<Boolean> isConnected) {\n\n return Flux.create(sink -> {\n\n Conversation conversation = new Conversation(takeUntil, sink);\n\n // ensure ordering in which conversations are added to both queues.\n synchronized (this.conversations) {\n if (this.conversations.offer(conversation)) {\n\n sink.onRequest(value -> onRequest(conversation, value));\n\n if (!isConnected.get()) {\n sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));\n return;\n }\n\n Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {\n if (!isConnected.get()) {\n sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));\n }\n });\n\n sender.accept(requestMessages);\n } else {\n sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));\n\n }\n }\n });\n }\nRun Code Online (Sandbox Code Playgroud)\n当队列超过 255 并且 Queue.offer 方法返回 false 时出错。导致抛出异常。
\n抱歉,我不熟悉英语。请帮我弄清楚发生了什么以及修复它的解决方案。\n我想为每个请求批量插入 >100000 条记录。
\n谢谢。
\n今天在一个过程中遇到了这个异常@Transactional,并能够通过两种方式解决它:
concatMap使用concatMapset为我prefetch解决Integer.MAX_VALUE了这个问题,甚至大大提高了堆消耗。
flatMap如果确实想保留flatMap,可以设置-Dreactor.bufferSize.small为比 更高的值256。但这有点危险,在本地进行测试时,我OutOfMemoryError在尝试一次处理 100,000 条记录时遇到了问题。
请concatMap改为使用并设置prefetch为您希望能够处理的最大数量或记录。
| 归档时间: |
|
| 查看次数: |
3415 次 |
| 最近记录: |