Mar*_*ter 8 java http java-http-client java-11
当我需要以流方式处理响应时,我应该如何处理使用 Java 11 中包含的 HTTP 客户端挂起发送 HTTP 响应正文的服务器?
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(2))
.build();
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create("http://example.com"))
.timeout(Duration.ofSeconds(5))
.build();
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
responseLineStream.count();
Run Code Online (Sandbox Code Playgroud)
在上面的代码中:
BodyHandler,当收到状态行和报头时,就认为收到了响应。这意味着当代码执行时,在 7 秒内要么抛出异常,要么到达最后一行。但是,最后一行不受任何超时的限制。如果服务器停止发送响应正文,最后一行将永远阻塞。
在这种情况下,如何防止最后一行挂起?
我的猜测是留给流的消费者,因为这是处理逻辑的一部分,所以身体处理仍然可以用以下方式处理CompletableFuture:
HttpResponse<Stream<String>> httpResponse = httpClient.send(httpRequest,
HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> responseLineStream.count());
long count = future.get(3, TimeUnit.SECONDS);
Run Code Online (Sandbox Code Playgroud)
或者只是简单地Future由 Java 执行Executor。
解决此问题的一种方法是对接收整个正文所需的时间设置超时。这就是MA的解决方案的作用。正如您所注意到的,如果超时,您应该关闭流,以便正确释放连接而不是挂在后台。更通用的方法是实现 a BodySubscriber,当上游在超时时间内未完成时,它会异常完成自身。这使得不必仅仅为了定时等待或关闭流而生成线程。这是一个适当的实现。
class TimeoutBodySubscriber<T> implements BodySubscriber<T> {
private final BodySubscriber<T> downstream;
private final Duration timeout;
private Subscription subscription;
/** Make sure downstream isn't called after we receive an onComplete or onError. */
private boolean done;
TimeoutBodySubscriber(BodySubscriber<T> downstream, Duration timeout) {
this.downstream = downstream;
this.timeout = timeout;
}
@Override
public CompletionStage<T> getBody() {
return downstream.getBody();
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
this.subscription = requireNonNull(subscription);
downstream.onSubscribe(subscription);
// Schedule an error completion to be fired when timeout evaluates
CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
.execute(this::onTimeout);
}
private synchronized void onTimeout() {
if (!done) {
done = true;
downstream.onError(new HttpTimeoutException("body completion timed out"));
// Cancel subscription to release the connection, so it doesn't keep hanging in background
subscription.cancel();
}
}
@Override
public synchronized void onNext(List<ByteBuffer> item) {
if (!done) {
downstream.onNext(item);
}
}
@Override
public synchronized void onError(Throwable throwable) {
if (!done) {
done = true;
downstream.onError(throwable);
}
}
@Override
public synchronized void onComplete() {
if (!done) {
done = true;
downstream.onComplete();
}
}
static <T> BodyHandler<T> withBodyTimeout(BodyHandler<T> handler, Duration timeout) {
return responseInfo -> new TimeoutBodySubscriber<>(handler.apply(responseInfo), timeout);
}
}
Run Code Online (Sandbox Code Playgroud)
它可以按如下方式使用:
Duration timeout = Duration.ofSeconds(10);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, TimeoutBodySubscriber.withTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
Run Code Online (Sandbox Code Playgroud)
另一种方法是使用读取超时。这更加灵活,因为只要服务器保持活动状态(即不断发送内容),响应就不会超时。BodySubscriber如果在超时内没有收到下一个请求的信号,您将需要一个异常完成自身的函数。这实现起来稍微复杂一些。如果您可以忍受依赖性,则可以使用甲醇。它实现了所描述的读取超时。
Duration timeout = Duration.ofSeconds(3);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, MoreBodyHandlers.withReadTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
Run Code Online (Sandbox Code Playgroud)
另一种策略是结合使用两者:一旦服务器变为非活动状态或正文需要太长时间才能完成,则超时。
| 归档时间: |
|
| 查看次数: |
265 次 |
| 最近记录: |