我一直在将现有应用程序迁移到Spring Cloud的服务发现,Ribbon负载平衡和断路器。该应用程序已经广泛使用RestTemplate,并且我已经能够成功使用模板的负载平衡版本。但是,我一直在测试服务有两个实例的情况,并且将其中一个实例停运。我希望RestTemplate故障转移到下一个服务器。根据我所做的研究,似乎故障切换逻辑存在于Feign客户端以及使用Zuul时。看来LoadBalancedRest模板没有用于故障转移的逻辑。在深入研究代码时,RibbonClientHttpRequestFactory似乎正在使用netflix RestClient(似乎具有进行重试的逻辑)。
那么,我从哪里去工作呢?
我宁愿不使用Feign客户端,因为我必须清除很多代码。我发现此链接建议将@Retryable注释与@HystrixCommand一起使用,但这似乎应该成为负载平衡的其余模板的一部分。
我做了一些深入研究RibbonClientHttpRequestFactory.RibbonHttpRequest的代码:
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
try {
addHeaders(headers);
if (outputStream != null) {
outputStream.close();
builder.entity(outputStream.toByteArray());
}
HttpRequest request = builder.build();
HttpResponse response = client.execute(request, config);
return new RibbonHttpResponse(response);
}
catch (Exception e) {
throw new IOException(e);
}
}
Run Code Online (Sandbox Code Playgroud)
看来,如果我重写此方法并将其更改为使用“ client.executeWithLoadBalancer()”,那么我也许能够利用RestClient内置的重试逻辑?我想我可以创建自己的RibbonClientHttpRequestFactory版本来执行此操作?
只是寻找最佳方法的指导。
谢谢
要回答我自己的问题:
在详细介绍之前,请注意一个警告故事:
尤里卡(Eureka)的自我保存模式使我在本地计算机上测试故障转移时陷入了困境。我建议在进行测试时关闭自我保存模式。因为我要定期删除节点,然后重新启动(使用随机值使用不同的实例ID),所以我跳了Eureka的自我保存模式。我最终在Eureka中获得了许多实例,这些实例指向同一台机器,同一端口。故障转移实际上正在运行,但是选择的下一个节点恰巧是另一个失效实例。起初非常令人困惑!
我能够使用RibbonClientHttpRequestFactory的修改版进行故障转移。因为RibbonAutoConfiguration使用此工厂创建一个负载平衡的RestTemplate,而不是注入此rest模板,所以我使用请求工厂的修改后的版本创建了一个新模板:
protected RestTemplate restTemplate;
@Autowired
public void customizeRestTemplate(SpringClientFactory springClientFactory, LoadBalancerClient loadBalancerClient) {
restTemplate = new RestTemplate();
// Use a modified version of the http request factory that leverages the load balacing in netflix's RestClient.
RibbonRetryHttpRequestFactory lFactory = new RibbonRetryHttpRequestFactory(springClientFactory, loadBalancerClient);
restTemplate.setRequestFactory(lFactory);
}
Run Code Online (Sandbox Code Playgroud)
修改后的“请求工厂”只是RibbonClientHttpRequestFactory的副本,其中有两个小的更改:
1)在createRequest中,我从负载均衡器中删除了选择服务器的代码,因为RestClient会为我们做到这一点。2)在内部类RibbonHttpRequest中,我将executeInternal更改为调用“ executeWithLoadBalancer”。
全班:
@SuppressWarnings("deprecation")
public class RibbonRetryHttpRequestFactory implements ClientHttpRequestFactory {
private final SpringClientFactory clientFactory;
private LoadBalancerClient loadBalancer;
public RibbonRetryHttpRequestFactory(SpringClientFactory clientFactory, LoadBalancerClient loadBalancer) {
this.clientFactory = clientFactory;
this.loadBalancer = loadBalancer;
}
@Override
public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException {
String serviceId = originalUri.getHost();
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
RestClient client = clientFactory.getClient(serviceId, RestClient.class);
HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());
return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
}
public class RibbonHttpRequest extends AbstractClientHttpRequest {
private HttpRequest.Builder builder;
private URI uri;
private HttpRequest.Verb verb;
private RestClient client;
private IClientConfig config;
private ByteArrayOutputStream outputStream = null;
public RibbonHttpRequest(URI uri, HttpRequest.Verb verb, RestClient client, IClientConfig config) {
this.uri = uri;
this.verb = verb;
this.client = client;
this.config = config;
this.builder = HttpRequest.newBuilder().uri(uri).verb(verb);
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(verb.name());
}
@Override
public URI getURI() {
return uri;
}
@Override
protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
if (outputStream == null) {
outputStream = new ByteArrayOutputStream();
}
return outputStream;
}
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
try {
addHeaders(headers);
if (outputStream != null) {
outputStream.close();
builder.entity(outputStream.toByteArray());
}
HttpRequest request = builder.build();
HttpResponse response = client.executeWithLoadBalancer(request, config);
return new RibbonHttpResponse(response);
}
catch (Exception e) {
throw new IOException(e);
}
//TODO: fix stats, now that execute is not called
// use execute here so stats are collected
/*
return loadBalancer.execute(this.config.getClientName(), new LoadBalancerRequest<ClientHttpResponse>() {
@Override
public ClientHttpResponse apply(ServiceInstance instance) throws Exception {}
});
*/
}
private void addHeaders(HttpHeaders headers) {
for (String name : headers.keySet()) {
// apache http RequestContent pukes if there is a body and
// the dynamic headers are already present
if (!isDynamic(name) || outputStream == null) {
List<String> values = headers.get(name);
for (String value : values) {
builder.header(name, value);
}
}
}
}
private boolean isDynamic(String name) {
return name.equals("Content-Length") || name.equals("Transfer-Encoding");
}
}
public class RibbonHttpResponse extends AbstractClientHttpResponse {
private HttpResponse response;
private HttpHeaders httpHeaders;
public RibbonHttpResponse(HttpResponse response) {
this.response = response;
this.httpHeaders = new HttpHeaders();
List<Map.Entry<String, String>> headers = response.getHttpHeaders().getAllHeaders();
for (Map.Entry<String, String> header : headers) {
this.httpHeaders.add(header.getKey(), header.getValue());
}
}
@Override
public InputStream getBody() throws IOException {
return response.getInputStream();
}
@Override
public HttpHeaders getHeaders() {
return this.httpHeaders;
}
@Override
public int getRawStatusCode() throws IOException {
return response.getStatus();
}
@Override
public String getStatusText() throws IOException {
return HttpStatus.valueOf(response.getStatus()).name();
}
@Override
public void close() {
response.close();
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4982 次 |
| 最近记录: |