如何在WebClient Springboot中处理“io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection Reset by Peer”

use*_*741 6 java spring spring-boot spring-webflux spring-webclient

我的 ActiveMQ 中有 579,000 条待处理消息,我需要将这些消息发布到 REST API。在此过程中,我以每秒 3000 次异步点击的速度访问 REST API。一段时间后,我开始为每个请求不断收到下面提到的异常:-

\n
2021:06:10 05:25:40.002 [DefaultMessageListenerContainer-58047] [INFO] [com.main.consumer.AmazonMQConsumer] - Recieved TOPIC MESSAGE: DUMMY_MSG\n2021:06:10 05:25:40.004 [reactor-http-epoll-6] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMM895fb, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:40.173 [reactor-http-epoll-7] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMMf5600, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:40.365 [reactor-http-epoll-9] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMM9223f, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:41.966 [reactor-http-epoll-8] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMMb5cf1, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:43.419 [reactor-http-epoll-8] [WARN] [reactor.netty.http.client.HttpClientConnect] - [id: 0DUMM7fa0a, L:/DUMMY_IP:DUMMY_PORT - R:DUMMY-URL.net/DUMMY_IP':DUMMY_PORT'] The connection observed an error\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n2021:06:10 05:25:43.419 [reactor-http-epoll-8] [WARN] [io.netty.channel.AbstractChannelHandlerContext] - An exception 'reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:\nio.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n    |_ checkpoint \xc3\xa2\xe2\x80\xa1\xc2\xa2 Request to POST https://DUMMY-URL.net/api/v1/evt?p=.....so on\n    Stack trace:\n
Run Code Online (Sandbox Code Playgroud)\n

异步 Rest API 调用代码:-

\n
@Service\npublic class DummyConnectionService {\n    @Value("${web.client.retry.max-attempts}")\n    private String maxAttempts;\n    \n    @Value("${web.client.retry.min-backoff}")\n    private String minBackoff;\n    \n    @Value("${Dummy.base-url}")\n    private String DummyBaseUrl;\n    \n    @Value("${Dummy.api-path}")\n    private String DummyApiPath;\n    \n    @Autowired\n    WebClient.Builder webClientBuilder;\n    \n    public Mono<String> sendDataToDummy(String query, final String eventData) throws Exception {\n        Mono<String> response = webClientBuilder.baseUrl(DummyBaseUrl)\n                .build()\n                .post()\n                .uri(uriBuilder -> uriBuilder\n                        .path(DummyApiPath)\n                        .query(query)\n                        .build(eventData))\n                .retrieve()\n                .bodyToMono(String.class)\n                .retryWhen(Retry.backoff(Long.parseLong(maxAttempts), Duration.ofSeconds(Long.parseLong(minBackoff)))); //maxAttempts = 3, minBackoff = 5\n        \n        return response;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

sendDataToDummy() 的调用代码:-

\n
Mono<String> response = DummyConnectionService.sendDataToDummy(postData.toString(), json);\n            \nresponse.subscribe(res -> {\n    if(res.contains(STATUS_OK)) {\n        log.info(res);\n    } else {\n        log.error(res);\n    }\n});\n
Run Code Online (Sandbox Code Playgroud)\n

请帮助我,让我知道为什么会出现这种异常以及如何在不停止程序流程的情况下优雅地处理它。

\n