如果占用太多时间,如何取消AsyncRestTemplate HTTP请求?

joh*_*ohn 13 java spring guava resttemplate spring-4

从开始以来,我总是混淆如何处理InterruptedException以及如果他们花费太多时间如何正确取消http请求.我有一个库,我在其中为客户提供了两种方法,sync和async.他们可以调用他们认为适合他们目的的任何方法.

  • executeSync() - 等到我有结果,返回结果.
  • executeAsync() - 立即返回一个Future,如果需要,可以在其他事情完成后处理.

它们将传递DataKey具有用户id和超时值的对象.我们将根据用户ID确定调用哪台计算机,然后使用该计算机创建一个URL,我们将使用AsyncRestTemplate对URL进行http调用,然后根据它是否成功将响应发送给它们.

我正在使用返回a的交换方法,我想要使​​用基于NIO的客户端连接的异步非阻塞架构,以便请求使用非阻塞IO,这就是我使用的原因.这种方法对我的问题定义是否合适?该库将在非常重的负载下用于生产.AsyncRestTemplateListenableFutureAsyncRestTemplate

以下是我的界面:

public interface Client {
    // for synchronous
    public DataResponse executeSync(DataKey key);

    // for asynchronous
    public ListenableFuture<DataResponse> executeAsync(DataKey key);
}
Run Code Online (Sandbox Code Playgroud)

以下是我对界面的实现:

public class DataClient implements Client {

    // using spring 4 AsyncRestTemplate
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();

    // for synchronous
    @Override
    public DataResponse executeSync(DataKey keys) {
        Future<DataResponse> responseFuture = executeAsync(keys);
        DataResponse response = null;

        try {
            response = responseFuture.get(keys.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            // do we need to catch InterruptedException here and interrupt the thread?
            Thread.currentThread().interrupt();
            // also do I need throw this RuntimeException at all?
            throw new RuntimeException("Interrupted", ex);
        } catch (TimeoutException ex) {
            DataLogging.logEvents(ex, DataErrorEnum.CLIENT_TIMEOUT, keys);
            response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
        } catch (Exception ex) {
            DataLogging.logEvents(ex, DataErrorEnum.ERROR_CLIENT, keys);
            response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
        }

        return response;
    }

    // for asynchronous     
    @Override
    public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {

        final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
        final org.springframework.util.concurrent.ListenableFuture orig = 
            restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class);

        orig.addCallback(
                new ListenableFutureCallback<ResponseEntity<String>>() {
                    @Override
                    public void onSuccess(ResponseEntity<String> result) {
                        responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
                                DataStatusEnum.SUCCESS));
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
                        responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_SERVER,
                                DataStatusEnum.ERROR));
                    }
                });

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor());
        return responseFuture;
    }
}
Run Code Online (Sandbox Code Playgroud)

客户将从他们的代码中这样调用 -

// if they are calling executeSync() method
DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);

// and if they want to call executeAsync() method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);
Run Code Online (Sandbox Code Playgroud)

现在的问题是 -

  1. AsyncRestTemplate如果http请求耗时太长,我们可以中断呼叫吗?我实际上是在我的上面的代码中调用cancel我的方法,但我不知道如何验证它以确保它正在做它应该做的事情?我想将取消传播回原来的未来,以便我可以取消相应的http请求(我可能想要节省资源),这就是为什么我在executeAsync方法中添加了一个监听器.我相信,我们不能打断电话,但不确定我们是否可以这样做.如果让我们说我们可以中断呼叫,那么我是否正在做一切正确的中断http呼叫?或者有更好/更清洁的方法吗?或者我是否需要担心使用我当前的设计取消Http请求?futureexecuteSyncRestTemplateAsyncRestTemplateAsyncRestTemplateAsyncRestTemplate

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor()); 
    
    Run Code Online (Sandbox Code Playgroud)

    使用当前设置,我可以看到它在某些时候(不是每次都)抛出CancellationException - 这是否意味着我的HTTP请求被取消了?

  2. 我也是InterruptedExceptionexecuteSync方法的catch块中做正确的事情吗?如果没有,那么处理这个问题的正确方法是什么.InterruptedException在我的案例中,我是否需要处理?
  3. 默认情况下是否AsyncRestTamplete使用阻塞调用和每个线程请求?如果是,那么在我当前的设置中是否有任何方法可以使用基于NIO的客户端连接?

任何解释/代码建议都会有很大帮助.

jfc*_*edo 9

首先,您为什么使用SettableFuture?为什么不能只返回AsyncRestTemplate返回的ListenableFuture?

1. Can we interrupt AsyncRestTemplate call if http request is taking too long?
Run Code Online (Sandbox Code Playgroud)

你当然可以!你只需要调用Future.cancel方法.此方法将中断AsyncRestTemplate实际使用的内部RestTemplate的执行.

2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?
Run Code Online (Sandbox Code Playgroud)

正如Phil和Danilo所说,你不需要在InterruptedException catch块中中断当前线程.当必须取消执行请求时,只需执行您需要执行的操作.

实际上,我建议您创建一个处理此行为的方法,例如handleInterruption,并将此方法用于TimeoutExceptionInterruptedException.

3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?
Run Code Online (Sandbox Code Playgroud)

是.默认构造函数AsyncRestTamplete是内部使用SimpleClientHttpRequestFactorySimpleAsyncTaskExecutor.

此TaskExecutor始终为每个任务启动威胁,并且永远不会重用Threads,因此效率非常低:

 * TaskExecutor implementation that fires up a new Thread for each task,
 * executing it asynchronously.
 *
 * Supports limiting concurrent threads through the "concurrencyLimit"
 * bean property. By default, the number of concurrent threads is unlimited.
 *
 * NOTE: This implementation does not reuse threads! Consider a
 * thread-pooling TaskExecutor implementation instead, in particular for
 * executing a large number of short-lived tasks.
 *
Run Code Online (Sandbox Code Playgroud)

我建议你使用另一种AsyncRestTemplate配置.

您应该使用AsyncRestTemplate的构造函数,该构造函数使用另一个TaskExecutor:

public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)
Run Code Online (Sandbox Code Playgroud)

例如:

AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));
Run Code Online (Sandbox Code Playgroud)

此ExecutorService(Executors.newCachedThreadPool())根据需要创建新线程,但在可用时将重用先前构造的线程.

或者甚至更好,您可以使用另一个RequestFactory.例如,您可以使用HttpComponentsAsyncClientHttpRequestFactory内部使用NIO的方法,只需调用AsyncRestTemplate的正确构造函数:

new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())
Run Code Online (Sandbox Code Playgroud)

不要忘记AsyncRestTemplate的内部行为将取决于您如何创建对象.