使用带有回调的Spring 4.0新的ListenableFuture - 奇怪的结果

Bal*_*Bal 6 java spring java.util.concurrent threadpool

我有一个Web应用程序,它接受一系列ID,一次查询每个ID的外部Web服务,并在每个结果通过STOMP代理到达WebSocket客户端时发布.我可以使用简单的Futures来使用它,但我正在尝试使用Spring 4的新ListenableFutures并提供回调.

工作代码使用我的root配置中定义的ThreadPoolTask​​Executor.我有一个名为"SosQuery"的类,其中一个名为"test"的方法使用@Async注释并返回AsyncResult.这是从根上下文服务类调用的工作代码:

@Override
    public void test(String[] oids) throws Exception {
        List<Future<String>> futures = new ArrayList<Future<String>>();

        for (String oid : oids) {
            futures.add(sosQuery.test(oid));
        }

        while (!futures.isEmpty()) {
            List<Future<String>> done = new ArrayList<Future<String>>();
            for (Future<String> future : futures) {
                if (future.isDone()) {
                    messagingTemplate.convertAndSendToUser("me", "/queue/observation", future.get());
                    done.add(future);
                }
            }
            futures.removeAll(done);
        }
    }
Run Code Online (Sandbox Code Playgroud)

这工作正常,我看到响应到达我的客户端.我修改了使用@Async注释定义的SosQuery方法,只返回"String",并在我的root配置中创建了一个SimpleAsyncTaskExecutor.以下是使用ListenableFuture的修改方法:

 @Override
    public void test(String[] oids) throws Exception {
        for (final String oid : oids) {
              ListenableFuture<String> task = asyncTaskExecutor.submitListenable(new Callable<String>(){
                @Override
                public String call() throws Exception {
                    String result = sosQuery.test(oid);
                    logger.debug("result for sosQuery: " + result);
                    return result;
                }
            });

            task.addCallback(new ListenableFutureCallback<String>() {

                @Override
                public void onSuccess(String result){
                    if (result == null){
                        result = "ITS NULL";
                    }
                    messagingTemplate.convertAndSendToUser("me", "/queue/observation", result);
                }

                @Override
                public void onFailure(Throwable t){
                    logger.error("Error executing callback.", t);
                }
            });
        }
    }
Run Code Online (Sandbox Code Playgroud)

我看到了奇怪的行为......当我在调试模式下部署时,我可以看到正在执行call()方法并且正确地从SosQuery类构建结果,但是我的logger语句永远不会出现在日志中.紧接着,onSuccess方法执行,但结果String为null.

永远不会调用onFailure方法,并且日志中没有任何区别.使用ListableFutures的文档很少并且与AsyncRestTemplate紧密耦合,但是只创建自己的任务很少.有没有人知道我可能做错了什么?

Man*_*ani 6

您应该在SosQuery.test方法中删除@Async.

ListenableFuture<String> task = asyncTaskExecutor.submitListenable(new Callable<String>(){
                @Override
                public String call() throws Exception {
                    String result = sosQuery.test(oid);
                    logger.debug("result for sosQuery: " + result);
                    return result;
                }
            });
Run Code Online (Sandbox Code Playgroud)

这里call()方法内的内容已在单独的线程中调用.如果你在测试方法中有@Async.然后它会创建另一个线程并立即返回(这就是为什么你在测试方法完成之前立即得到响应)

Doc**的另一个重要说明

    This implementation does not reuse threads! Consider a thread-pooling TaskExecutor 
implementation instead, in particular for executing a large number of short-lived tasks.
Run Code Online (Sandbox Code Playgroud)