我有以下代码,它使用 WebClient 进行 HTTP 调用。
webClient.post()
.uri("/users/track")
.body(BodyInserters.fromObject(getUserTrackPayload(selection, customAttribute, partyId).toString()))
.header(CONTENT_TYPE, APPLICATION_JSON)
.retrieve()
.onStatus(httpStatus -> !CREATED.equals(httpStatus),
response -> response.bodyToMono(String.class)
.flatMap(body -> buildErrorMessage(response.statusCode().value(), body, partyId,
customAttribute)
.flatMap(e -> Mono.error(new MyException(e)))))
.bodyToMono(Object.class)
.map(o -> (JsonObject)new Gson().toJsonTree(o))
.flatMap(body -> body.get("message") != null && body.get("message").getAsString().equalsIgnoreCase("success")
&& body.get("attributes_processed") != null && body.get("attributes_processed").getAsInt() == 1
? Mono.just(body)
: buildErrorMessage(CREATED.value(), body.toString(), partyId, customAttribute)
.flatMap(e -> Mono.error(new MyException(e))));
Run Code Online (Sandbox Code Playgroud)
一段时间后(比如 10 分钟)第一次调用此代码时,我收到以下日志。但是,调用成功并输出了正确的结果。
io.netty.channel.unix.Errors$NativeIoException: syscall:read(..) failed: Connection reset by peer at io.netty.channel.unix.FileDescriptor.readAddress(..)(Unknown Source)
2019-03-19 03:11:45,625 WARN [:::] [reactor-http-epoll-8] reactor.netty.http.client.HttpClientConnect : …Run Code Online (Sandbox Code Playgroud) 我有以下错误处理RestTemplate:
try {
restTemplate.postForObject(..);
} catch (ResourceAccessException e) {
throw new CustomException("host is down");
}
Run Code Online (Sandbox Code Playgroud)
问题:我怎样才能用 spring 达到同样的效果WebClient?
try {
webClient.post()...block();
} catch (Exception e) {
//cannot check due to package private access
//if (e instanceof Exceptions.ReactiveException)
if (e.getCause() instanceof java.net.ConnectException) {
throw new CustomException("host is down");
}
}
Run Code Online (Sandbox Code Playgroud)
问题:我不能直接捕捉,ConnectionException因为它被包裹在ReactiveException. 我能比instanceof对任何真正的潜在异常应用多次检查做得更好吗?
如果远程服务阻塞,我可以发送多少个并发请求?意思是:spring在使用时内部使用的maxConnection池限制是WebClient多少?
@Autowired
private WebClient webClient;
webClient.post().uri(url).syncBody(req).retrieve().bodyToMono(type);
Run Code Online (Sandbox Code Playgroud)
此外:我该如何修改它?
我正面临与WebClient和的问题reactor-extra。确实,我有以下方法:
public Employee getEmployee(String employeeId) {
return webClient.get()
.uri(FIND_EMPLOYEE_BY_ID_URL, employeeId)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, clientResponse -> Mono.empty())
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new MyCustomException("Something went wrong calling getEmployeeById")))
.bodyToMono(Employee.class)
.retryWhen(Retry.onlyIf(ConnectTimeoutException.class)
.fixedBackoff(Duration.ofSeconds(10))
.retryMax(3))
.block();
}
Run Code Online (Sandbox Code Playgroud)
我发现我可以使用,retryWhen(Retry.onlyIf(...))因为我只想在ConnectTimeoutException抛出a 时重试。我从这篇文章中找到了这个解决方案:spring webclient: retry with backoff on specific error
但是,在reactor以下方法的最新版本中已弃用:
public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory)
谷歌搜索后,我还没有发现任何解决这个问题的时间:是否有任何替代retryWhen和Retry.onlyIf使用的最新版本reactor
谢谢你的帮助 !
reactive-programming spring-boot project-reactor reactor-netty spring-webclient
以下场景:
我有两个微服务 A 和 B。服务 A 是一个承载客户端,它有一个开放的 api 并接收来自必须由 keycloak 授权的客户端的请求。现在我想从服务 A 向服务 B 发送一个授权请求,它也是一个承载客户端。
我想过在 webclient builder 过程中将功能添加为过滤器功能,例如
@Bean
WebClient webClient() {
return WebClient.builder()
.filter(authHeader())
.build();
}
private ExchangeFilterFunction authHeader(String token) {
return (request, next) -> next.exchange(ClientRequest.from(request).headers((headers) -> {
headers.setBearerAuth(token);
}).build());
}
Run Code Online (Sandbox Code Playgroud)
这是我在另一个问题中找到的一个例子。这对我来说似乎是正确的方法,但我可以在配置的那个阶段提供“字符串令牌”参数吗?我只是从 RestTemplate 切换到 Webclient,很抱歉,这是一个转储问题。
编辑:我可以在构建新的 Webclient 时手动设置标题。
return WebClient.builder().defaultHeader("Authorization", "Bearer "+ context.getTokenString()).build();
Run Code Online (Sandbox Code Playgroud)
正如我从 RestTemplate 知道的,它可以用作单例。还有一个 KeyCloakRestTemplate 会自动注入标头。
Webclient 是不可变的,所以当我注入它时,我不能只使用它并在之后添加标头。此外,我无法在启动时设置此标头,因为我必须等待获取承载标头并将其传入的请求。所以我想除了这样做之外没有其他方法吗?
我想将 WebClient 调用的指标从服务公开到下游系统,需要诸如请求计数、最短、最长时间响应之类的指标。
我想知道如何为反应式网络客户端编写仪表。
这是一个 MeterBinder 示例,我有兴趣将其与 Webclient 一起使用。
class Metrics : MeterBinder {
override fun bindTo(registry: MeterRegistry) {
Gauge.builder("metrics", Supplier { Math.random() })
.baseUnit("status")
.register(registry)
}
}
Run Code Online (Sandbox Code Playgroud) spring reactive-programming kotlin prometheus spring-webclient
我有一个用户名列表,想要从远程服务获取用户详细信息而不阻塞主线程。我正在使用 Spring 的反应式客户端 WebClient。对于响应,我获取 Mono,然后订阅它并打印结果。
private Mono<User> getUser(String username) {
return webClient
.get()
.uri(uri + "/users/" + username)
.retrieve()
.bodyToMono(User.class)
.doOnError(e ->
logger.error("Error on retrieveing a user details {}", username));
}
Run Code Online (Sandbox Code Playgroud)
我通过两种方式实现了这个任务:
使用Java stream
usernameList.stream()
.map(this::getUser)
.forEach(mono ->
mono.subscribe(System.out::println));
Run Code Online (Sandbox Code Playgroud)
使用Flux.fromIterable:
Flux.fromIterable(usernameList)
.map(this::getUser)
.subscribe(mono ->
mono.subscribe(System.out::println));
Run Code Online (Sandbox Code Playgroud)
看来主线程并没有以两种方式被阻塞。Java Stream在这种情况下和有什么区别Flux.fromIterable?如果两者都在做同样的事情,建议使用哪一个?
在我的项目中,需要从外部服务下载文件并将其存储在某个路径中。所以我RestTemplate在 spring中使用将文件下载为 Byte[] 。但是这个操作消耗了我的内存消耗。我在这个博客https://inneka.com/programming/spring/spring-webclient-how-to-stream-large-byte-to-file/ 中发现了关于使用 spring webClient 并将文件写入 path 。由于我关注的文件是我使用的音频文件APPLICATION_OCTET_STREAM。以下代码不起作用的问题。我对反应式编程非常陌生,任何对此的提示都会有所帮助。
public Mono<void> testWebClientStreaming(String filename, String format) throws IOException {
WebClient webClient = WebClient.create("https://stackoverflow.com");
String username = "Username";
String token = "Password";
String path = DOWNLOAD_TEMP_PATH + PATH_SEPERATOR + filename +"."+ format;
Flux<DataBuffer> stream = webClient
.get()
.uri("/files/{filename}.{format}",
filename, format)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Authorization", "Basic " + Base64Utils.encodeToString((username + ":" + token).getBytes(UTF_8)))
.retrieve()
.bodyToFlux(DataBuffer.class);
return DataBufferUtils.write(stream, asynchronousFileChannel)
.doOnComplete(() -> System.out.println("\n\n\n\n\n\n FINISHED "))
.doOnNext(DataBufferUtils.releaseConsumer())
.doAfterTerminate(() -> {
try …Run Code Online (Sandbox Code Playgroud) java reactive-programming spring-boot spring-webflux spring-webclient
我们使用org.springframework.web.reactive.function.client.WebClientwith
reactor.netty.http.client.HttpClient作为 Spring 5.1.9 的一部分来使用该exchange()方法发出请求。此方法的文档强调了以下几点:
...在使用 exchange() 时,无论场景如何(成功、错误、意外数据等),应用程序都有责任使用任何响应内容。不这样做会导致内存泄漏。
我们对 的使用exchange()相当基本,但我不清楚错误场景的文档,我想确定我们是否正确地为所有结果释放了资源。本质上,我们有一个阻塞实现,它发出请求并返回ResponseEntity无论响应代码如何:
try {
...
ClientResponse resp = client.method(method).uri(uri).syncBody(body).exchange().block();
ResponseEntity<String> entity = resp.toEntity(String.class).block();
return entity;
} catch (Exception e) {
// log error details, return internal server error
}
Run Code Online (Sandbox Code Playgroud)
如果我理解实现,exchange()无论响应代码如何(例如 4xx、5xx),如果请求成功发送,将始终给我们一个响应。在这种情况下,我们只需要调用toEntity()即可使用响应。我担心的是错误情况(例如,无响应、低级连接错误等)。上述异常处理是否会捕获所有其他场景,并且它们中的任何一个都有需要消耗的响应?
注意:ClientResponse.releaseBody()仅在 5.2 中引入
我是反应式编程的新手,我正在使用 Spring WebFlux 的 WebClient 向以下 URL 发出 POST 请求,作为我的 Spring Boot 应用程序的一部分,以将现有测验分配给候选人。我无法理解我在构建 WebClient 请求时做错了什么。
终点
https://www.flexiquiz.com/api/v1/users/{user_id}/quizzes
在我的请求正文中,我需要传递从另一个 API 获得的测验 ID(工作正常)。
{
"quiz_id": ""
}
Run Code Online (Sandbox Code Playgroud)
除了传递请求正文之外,我还传递X-API-KEY作为请求标头的一部分。
但是,当我尝试到达终点时,我收到了{"message":"400: Bad Request"}错误。
下面是我的代码。
测验请求.java
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class QuizRequest {
@JsonProperty("quiz_id")
@NotBlank
private String quizId;
public QuizRequest(@NotBlank String quizId) {
this.quizId = quizId;
}
}
Run Code Online (Sandbox Code Playgroud)
FlexiQuizClient.java
@Service
@Slf4j
public class FlexiQuizClient {
private static final String USER_AGENT = "WebClient for FlexiQuiz";
private final WebClient webClient;
@Value("${flexiquiz.baseurl}")
private String FLEXIQUIZ_API_BASE_URL; …Run Code Online (Sandbox Code Playgroud) spring-webclient ×10
java ×7
spring ×6
spring-boot ×4
java-stream ×1
kotlin ×1
oauth ×1
prometheus ×1
reactor ×1
webclient ×1