一旦我的线程中断,我怎么能中断RestTemplate调用?

joh*_*ohn 5 java multithreading executorservice interrupted-exception resttemplate

我需要创建一个库,在其中我将具有同步和异步功能.

  • executeSynchronous() - 等到我有结果,返回结果.
  • executeAsynchronous() - 立即返回Future,如果需要,可在其他事情完成后处理.

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它.然后我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建DataResponse对象将该JSON字符串发送回我们的客户.有些客户会打电话executeSynchronous(),有些人可能会打电话executeAsynchronous(),这就是为什么我需要在我的库中单独提供两种方法.

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}
Run Code Online (Sandbox Code Playgroud)

然后我有我DataClient实现上面的Client接口:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            // does this looks right?
            future.cancel(true); // terminating tasks that have timed out
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}
Run Code Online (Sandbox Code Playgroud)

将执行实际任务的简单类:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}
Run Code Online (Sandbox Code Playgroud)

当我开始研究这个解决方案时,我并没有终止已经超时的任务.我向客户端报告超时,但任务继续在线程池中运行(可能长时间占用我有限的10个线程之一).所以我在网上做了一些研究,我发现我可以通过使用取消来取消已经超时的任务,如下所示 -

future.cancel(true);
Run Code Online (Sandbox Code Playgroud)

但是,如果我按照上面的解决方案所示这样做,那么我是否需要在RestTemplate线程中断时关闭任何其他资源?如果是,那我该怎么做?还有,我们可以打断RestTemplate电话吗?因为我一旦任务超时就尝试取消取消我的未来,但我猜我的线程没有被打断.

我们应该永远终止已经超时的任务吗?如果我们不这样做那么可能会产生什么影响?它会影响我的表现吗?

我目前的设置是否有更好的解决方案来处理这种情况?

eri*_*son 5

似乎是一个呼叫RestTemplate不能被中断或取消。即使使用使用回调的“kludge”,RestTemplate也可能在内部锁定资源,在调用回调之前等待响应。

当底层套接字可访问时,可以通过从另一个线程关闭套接字来中止网络 I/O。例如,可以在超时过后启动计时器以关闭套接字。或者,如果您想要一个对中断敏感的无限期超时(例如,由于用户按下“取消”按钮),您可以提交一个无限期等待但通过关闭套接字来响应中断的任务。

不幸的是,它的作者似乎没有RestTemplate提供此功能。

是的,您应该清理由于任务取消或到期而不再需要的资源。是的,它会影响性能。如果您的线程池的线程数量有限,最终所有线程都将卡在已失效的任务中。如果它有无限数量的线程,最终内存将被耗尽。


Mar*_*zak 4

有时无法中断线程,特别是当线程在 Socket 上执行阻塞操作时。

因此,您应该在 http 连接上设置超时,而不是在超时时取消任务。

不幸的是,每个连接工厂和 RestTemplate 都设置了超时,因此每个请求必须使用它自己的 RestTemplate。

您可以为每个任务创建新的 RestTemplate,或者使用 ThreadLocal 或资源池重用之前创建的模板。

例如,使用 Thread local 的任务可能如下所示:

    public class Task implements Callable<DataResponse> {

    private DataKey key;

    private ThreadLocal<RestTemplate> restTemplateThreadLocal =
            ThreadLocal.withInitial(()->new RestTemplate(new SimpleClientHttpRequestFactory()));

    public Task(DataKey key) {
        this.key = key;
    }

    private SimpleClientHttpRequestFactory getConnectionFactory(){
        return (SimpleClientHttpRequestFactory)restTemplateThreadLocal.get().getRequestFactory();
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            //it is up to you, how to set connection and read timeouts from provided key.getTimeout
            getConnectionFactory().setConnectTimeout(1000);
            getConnectionFactory().setReadTimeout(key.getTimeout());
            response = restTemplateThreadLocal.get().getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
   }
Run Code Online (Sandbox Code Playgroud)

顺便提一句。Spring还提供了AsyncRestTemplate,这可能会让你的代码更简单。如果与 Netty4ClientHttpRequestFactory 一起使用,您可以获得基于 NIO 的客户端连接。在这种情况下,即使在建立 Http 连接时,您也应该能够中断您的任务。

下面的简短示例。它使用NIO,因此你不必关心超时后请求是否真的被取消。

        URI url = new URI("http://www.chicagotribune.com/news/ct-college-of-dupage-investigation-met-20150330-story.html");
        Netty4ClientHttpRequestFactory asyncRequestFactory = new Netty4ClientHttpRequestFactory();
        AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(asyncRequestFactory);
        ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
        System.out.println("entity.get() = " + entity.get());
        asyncRequestFactory.destroy();
Run Code Online (Sandbox Code Playgroud)