带有webclient的功能区负载均衡器与其余模板不同(未正确平衡)

pix*_*xel 9 load-balancing resttemplate spring-cloud spring-cloud-netflix spring-webflux

我试过WebClientLoadBalancerExchangeFilterFunction:

WebClient 配置:

@Bean
public WebClient myWebClient(final LoadBalancerExchangeFilterFunction lbFunction) {
    return WebClient.builder()
            .filter(lbFunction)
            .defaultHeader(ACCEPT, APPLICATION_JSON_VALUE)
            .defaultHeader(CONTENT_ENCODING, APPLICATION_JSON_VALUE)
            .build();
} 
Run Code Online (Sandbox Code Playgroud)

然后我注意到对底层服务的调用没有正确地进行负载平衡 - 每个实例上的RPS存在恒定的差异.

然后我试着回去RestTemplate.它工作正常.

配置RestTemplate:

private static final int CONNECT_TIMEOUT_MILLIS = 18 * DateTimeConstants.MILLIS_PER_SECOND;
private static final int READ_TIMEOUT_MILLIS = 18 * DateTimeConstants.MILLIS_PER_SECOND;

@LoadBalanced
@Bean
public RestTemplate restTemplateSearch(final RestTemplateBuilder restTemplateBuilder) {
    return restTemplateBuilder
            .errorHandler(errorHandlerSearch())
            .requestFactory(this::bufferedClientHttpRequestFactory)
            .build();
}

private ClientHttpRequestFactory bufferedClientHttpRequestFactory() {
    final SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
    requestFactory.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
    requestFactory.setReadTimeout(READ_TIMEOUT_MILLIS);
    return new BufferingClientHttpRequestFactory(requestFactory);
}

private ResponseErrorHandler errorHandlerSearch() {
    return new DefaultResponseErrorHandler() {
        @Override
        public boolean hasError(ClientHttpResponse response) throws IOException {
            return response.getStatusCode().is5xxServerError();
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

使用WebClient配置到11:25进行负载均衡,然后切换回RestTemplate:

网络客户端-VS-休息模板 - 负载均衡

是否存在这样的差异以及如何WebClient在每个实例上使用相同数量的RPS?线索可能是旧实例获得的请求多于新实例.

我已经尝试了一些调试,并且ZoneAwareLoadBalancer正在调用相同的(默认类似)逻辑.

Jar*_*ela 5

我做了简单的 POC,一切都与 web 客户端和默认配置的休息模板完全相同。

休息服务器代码:

@SpringBootApplication
internal class RestServerApplication

fun main(args: Array<String>) {
    runApplication<RestServerApplication>(*args)
}

class BeansInitializer : ApplicationContextInitializer<GenericApplicationContext> {
    override fun initialize(context: GenericApplicationContext) {
        serverBeans().initialize(context)
    }
}

fun serverBeans() = beans {
    bean("serverRoutes") {
        PingRoutes(ref()).router()
    }
    bean<PingHandler>()
}

internal class PingRoutes(private val pingHandler: PingHandler) {
    fun router() = router {
        GET("/api/ping", pingHandler::ping)
    }
}

class PingHandler(private val env: Environment) {
    fun ping(serverRequest: ServerRequest): Mono<ServerResponse> {
        return Mono
            .fromCallable {
                // sleap added to simulate some work
                Thread.sleep(2000)
            }
            .subscribeOn(elastic())
            .flatMap {
                ServerResponse.ok()
                    .syncBody("pong-${env["HOSTNAME"]}-${env["server.port"]}")
            }
    }
}
Run Code Online (Sandbox Code Playgroud)

application.yaml添加:

context.initializer.classes: com.lbpoc.server.BeansInitializer
Run Code Online (Sandbox Code Playgroud)

gradle 中的依赖项:

implementation('org.springframework.boot:spring-boot-starter-webflux')
Run Code Online (Sandbox Code Playgroud)

休息客户端代码:

@SpringBootApplication
internal class RestClientApplication {
    @Bean
    @LoadBalanced
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder()
    }

    @Bean
    @LoadBalanced
    fun restTemplate() = RestTemplateBuilder().build()
}

fun main(args: Array<String>) {
    runApplication<RestClientApplication>(*args)
}

class BeansInitializer : ApplicationContextInitializer<GenericApplicationContext> {
    override fun initialize(context: GenericApplicationContext) {
        clientBeans().initialize(context)
    }
}

fun clientBeans() = beans {
    bean("clientRoutes") {
        PingRoutes(ref()).router()
    }
    bean<PingHandlerWithWebClient>()
    bean<PingHandlerWithRestTemplate>()
}

internal class PingRoutes(private val pingHandlerWithWebClient: PingHandlerWithWebClient) {
    fun router() = org.springframework.web.reactive.function.server.router {
        GET("/api/ping", pingHandlerWithWebClient::ping)
    }
}

class PingHandlerWithWebClient(private val webClientBuilder: WebClient.Builder) {
    fun ping(serverRequest: ServerRequest) = webClientBuilder.build()
        .get()
        .uri("http://rest-server-poc/api/ping")
        .retrieve()
        .bodyToMono(String::class.java)
        .onErrorReturn(TimeoutException::class.java, "Read/write timeout")
        .flatMap {
            ServerResponse.ok().syncBody(it)
        }
}

class PingHandlerWithRestTemplate(private val restTemplate: RestTemplate) {
    fun ping(serverRequest: ServerRequest) = Mono.fromCallable {
        restTemplate.getForEntity("http://rest-server-poc/api/ping", String::class.java)
    }.flatMap {
        ServerResponse.ok().syncBody(it.body!!)
    }
}
Run Code Online (Sandbox Code Playgroud)

application.yaml添加:

context.initializer.classes: com.lbpoc.client.BeansInitializer
spring:
  application:
    name: rest-client-poc-for-load-balancing
logging:
  level.org.springframework.cloud: DEBUG
  level.com.netflix.loadbalancer: DEBUG
rest-server-poc:
  listOfServers: localhost:8081,localhost:8082
Run Code Online (Sandbox Code Playgroud)

gradle 中的依赖项:

implementation('org.springframework.boot:spring-boot-starter-webflux')
implementation('org.springframework.cloud:spring-cloud-starter-netflix-ribbon')
Run Code Online (Sandbox Code Playgroud)

您可以尝试使用两个或多个服务器实例,它与 Web 客户端和其余模板的工作方式完全相同。

默认情况下使用功能区 zoneAwareLoadBalancer,如果您只有一个区域,则服务器的所有实例都将在“未知”区域中注册。

您可能在通过 Web 客户端保持连接时遇到问题。Web 客户端在多个请求中重用相同的连接,其余模板不这样做。如果您的客户端和服务器之间有某种代理,那么您可能会遇到 Web 客户端重用连接的问题。要验证它,您可以像这样修改 Web 客户端 bean 并运行测试:

@Bean
@LoadBalanced
fun webClientBuilder(): WebClient.Builder {
    return WebClient.builder()
        .clientConnector(ReactorClientHttpConnector { options ->
            options
                .compression(true)
                .afterNettyContextInit { ctx ->
                    ctx.markPersistent(false)
                }
        })
}
Run Code Online (Sandbox Code Playgroud)

当然,这对于生产来说不是一个好的解决方案,但是这样做您可以检查您的客户端应用程序内部的配置是否有问题,或者问题是否在外部,客户端和服务器之间的问题。例如,如果您使用 kubernetes 并使用服务器节点 IP 地址在服务发现中注册您的服务,那么对此类服务的每次调用都将通过 kube-proxy 负载均衡器,并将(默认情况下将使用循环)路由到某个 pod服务。