far*_*rid 13 spring spring-mvc spring-data-elasticsearch
我正在使用 spring-data-elasticsearch 3.2.3 中的 ReactiveElasticsearchClient 和 spring-boot 2.2.0。当升级到 spring-boot 2.2.2 时,我得到了 org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144。
它被指示修复使用 spring.codec.max-in-memory-size 但我仍然遇到相同的异常。
咆哮整个例外:
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
reactor.core.publisher.Flux.collect(Flux.java:3273)
org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
|_ Flux.collect ? at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
|_ Mono.filter ? at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
|_ Mono.map ? at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
|_ Mono.map ? at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
|_ checkpoint ? Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
|_ Mono.map ? at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
|_ Mono.doOnNext ? at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
|_ Mono.flatMap ? at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
|_ Mono.flatMapMany ? at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
|_ Flux.publishNext ? at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
|_ Flux.flatMap ? at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
|_ Flux.map ? at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
|_ Flux.reduce ? at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
|_ Mono.zip ? at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
|_ Mono.map ? at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Run Code Online (Sandbox Code Playgroud)
谁能告诉我我做错了什么还是这是一个错误?
谢谢
jen*_*ram 31
使用简单的反应,WebClient我遇到了同样的问题(从 2.1.9 到 2.2.1。)我没有运气设置spring.codec.max-in-memory-size,后来发现这不是无论如何都要走的提示:
... 在客户端,可以在 WebClient.Builder 中更改限制。
(来源,包括死链接:-S)
我还没有找到在哪里WebClient.Builder得到默认的256K限制1。但是,以下使我能够将缓冲区大小限制提高到 16M:
WebClient.builder()
.…
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
.build())
.build();
Run Code Online (Sandbox Code Playgroud)
所以,在我看来(不知道 的复杂性spring-data-elasticsearch),如果你能以某种方式获得WebClient从 返回的 ,WebClientProvider你应该能够改变它以包含ExchangeStrategies上面的。
也许你可以提供你自己的覆盖DefaultWebClientProvider(绝对未经测试!):
class MyDefaultWebClientProvider extends DefaultWebClientProvider {
@Override
public WebClient get(InetSocketAddress endpoint) {
return super.get(endpoint)
.mutate() // Obtain WebClient.Builder instance.
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
.build())
.build();
}
}
Run Code Online (Sandbox Code Playgroud)
天啊。
更新#1:
1)现在我找到了。并且解释了为什么设置spring.codec.max-in-memory-size没有效果;该属性在所有默认编解码器使用的基类中以 256K 硬编码,参见。BaseDefaultCodecs.
几天前,我实现了自定义的可能性WebClient,检查相应的 Jira 问题。这将在 Spring Data Elasticsearch 3.2.4 中可用,并且已经在当前的 master 分支中。
配置代码如下所示:
@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
.connectedTo("localhost:9200") //
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
Run Code Online (Sandbox Code Playgroud)
从 Spring Boot 2.3.0 开始,Reactive Elasticsearch REST 客户端现在有一个专用的配置属性。
您可以使用以下配置属性为客户端设置特定的内存限制。
spring.data.elasticsearch.client.reactive.max-in-memory-size=
Run Code Online (Sandbox Code Playgroud)
已存在的spring.codec.max-in-memory-size属性是独立的,仅影响WebClient应用程序中的其他实例。
| 归档时间: |
|
| 查看次数: |
19066 次 |
| 最近记录: |