如何在多线程环境中更好地使用ExecutorService?

joh*_*ohn 10 java multithreading daemon callable executorservice

我需要创建一个库,在其中我将有同步和异步方法.

  • executeSynchronous() - 等到我有结果,返回结果.
  • executeAsynchronous() - 立即返回Future,如果需要,可在其他事情完成后处理.

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它.然后我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建DataResponse对象将该JSON字符串发送回我们的客户.有些客户会打电话executeSynchronous(),有些人可能会打电话executeAsynchronous(),这就是为什么我需要在我的库中单独提供两种方法.

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}
Run Code Online (Sandbox Code Playgroud)

然后我有我DataClient实现上面的Client接口:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // do I need to have all threads as non-daemon or I can have daemon thread for my use case?
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            future.cancel(true); // terminating tasks that have timed out
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}
Run Code Online (Sandbox Code Playgroud)

将执行实际任务的简单类:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) { // should I catch RuntimeException or just Exception here?
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}
Run Code Online (Sandbox Code Playgroud)

我对上述解决方案几乎没有疑问 -

  • 我是否应该为上述用例使用守护程序或非守护程序线程?
  • 此外,我正在终止已经超时的任务,以便它长时间不占用我有限的10个线程之一.这看起来像我做的那样吗?
  • 在我的call()方法中,我正在捕捉异常.我应该去RuntimeException那里吗?如果我捕获Exception或RuntimeException有什么区别?

当我开始研究这个解决方案时,我并没有终止已经超时的任务.我向客户端报告超时,但任务继续在线程池中运行(可能长时间占用我有限的10个线程之一).所以我在网上进行了一些研究,发现我可以通过在将来使用取消来取消已经超时的任务,如下所示 -

future.cancel(true);
Run Code Online (Sandbox Code Playgroud)

但是我想确定,我在我的executeSynchronous方法中取消已经超时的任务的方式是否正确?

由于我打电话cancel()Future,将停止运行,如果任务仍然是在排队,所以我不知道我在做什么正确与否?这样做的正确方法是什么?

如果有更好的方法,那么任何人都能提供一个例子吗?

我们应该永远终止已经超时的任务吗?如果我们不这样做那么可能会产生什么影响?

Gra*_*ray 8

我是否应该为上述用例使用守护程序或非守护程序线程?

这取决于您是否希望这些线程停止程序退出.当最后一个非守护程序线程完成时,JVM将退出.

如果JVM存在,那么任何时候都可以杀死这些任务,那么它们应该是守护进程.如果您希望JVM等待它们,那么将它们设为非守护进程.

请参阅:java守护程序线程和非守护程序线程

此外,我正在终止已经超时的任务,以便它长时间不占用我有限的10个线程之一.这看起来像我做的那样吗?

是的,不是.你是正确的调用cancel()Future,将停止其运行是否仍然在队列中.但是,如果线程已经在运行任务,那么取消将只是中断线程.有可能restTemplate调用不可中断,因此中断将被忽略.只有某些方法(比如Thread.sleep(...)可中断和抛出InterruptException.所以调用future.cancel(true)不会停止操作并终止线程.

请参阅:线程不中断

你可以做的一件事是cancel()在你的Task对象上放一个强制关闭的方法restTemplate.你需要试验一下.另一个想法是在restTemplate连接或IO 上设置某种超时,这样它就不会永远等待.

如果您正在使用Spring,RestTemplate那么就没有直接关闭,但您可以关闭我认为可能通过的底层连接,SimpleClientHttpRequestFactory因此您需要调用disconnect()底层连接HttpURLConnection.

在我的call()方法中,我正在捕获异常.我应该去RuntimeException那里吗?

RuntimeException延伸Exception所以你已经抓住了它们.

如果我抓到Exception或者有RuntimeException什么区别?

捕获Exception捕获已检查(非运行时)异常运行时异常.仅仅捕获RuntimeException意味着任何已定义的异常都不会被捕获并且将被该方法抛出.

RuntimeExceptions是特殊的例外,不需要由代码检查.例如,任何代码都可以在IllegalArgumentException不定义方法的情况下抛出throws IllegalArgumentException.对于已检查的异常,如果调用者方法未捕获或抛出已检查的异常,则编译器错误,但RuntimeExceptions 不是这样.

这是关于这个主题的一个很好的答案:


小智 3

对于上述用例,我应该使用守护线程还是非守护线程?

这取决于。但在这种情况下,我更喜欢守护线程,因为使用允许进程退出的客户端很方便。

我这样做看起来正确吗?

不,事实并非如此。中断 IO 任务非常困难。也尝试在 RestTemplate 中设置超时。在这种情况下取​​消未来似乎毫无意义。

如果捕获 Exception 或 RuntimeException 有什么区别?

如果您在 try 块中没有检查异常,则没有区别:) 只是因为在这种情况下只有 RuntimeExceptions 可能。

还有一个更重要的注意事项:将同步调用实现为异步+等待是一个坏主意。它没有意义,每次调用都会消耗线程池中的一个线程。只需创建任务实例并在当前线程中调用它即可!