如何使用AsyncRestTemplate同时进行多个调用?

Gli*_*ide 16 java spring resttemplate

我不明白如何AsyncRestTemplate有效地使用外部服务电话.对于以下代码:

class Foo {

    public void doStuff() {
        Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(
                url1, String.class);
        String response1 = future1.get();

        Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
                url2, String.class);
        String response2 = future2.get();

        Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(
                url3, String.class);
        String response3 = future3.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

理想情况下,我希望同时执行所有3个调用,并在完成所有操作后处理结果.但是,在调用但被阻止之前,不会获取每个外部服务调用.那不就是打败了目的吗?我不妨用.get()get()AsyncRestTemplateRestTemplate

所以我不明白我怎么能让它们同时执行?

dig*_*ise 15

get()在调度所有异步调用之前,不要调用阻塞:

class Foo {
  public void doStuff() {
    ListenableFuture<ResponseEntity<String>> future1 = asyncRestTemplate
        .getForEntity(url1, String.class);
    ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate
        .getForEntity(url2, String.class);
    ListenableFuture<ResponseEntity<String>> future3 = asyncRestTemplate
        .getForEntity(url3, String.class);

    String response1 = future1.get();
    String response2 = future2.get();
    String response3 = future3.get();
  }
}
Run Code Online (Sandbox Code Playgroud)

你既可以做调度,并获得中环,但要注意,目前的结果是采集效率低下,因为它会卡住的下一个未完成的未来.

您可以将所有期货添加到集合中,并通过它迭代测试每个未来的非阻塞isDone().当该调用返回true时,您可以调用get().

这样,您的整体结果收集将被优化,而不是按照调用get()s 的顺序等待下一个缓慢的未来结果.

更好的是,您可以在每个ListenableFuture返回的内容中注册回调(运行时),AccyncRestTemplate并且您不必担心周期性地检查潜在结果.


小智 6

如果您不必使用'AsyncRestTemplate',我建议使用RxJava.RxJava zip运算符正是您所需要的.检查以下代码:

private rx.Observable<String> externalCall(String url, int delayMilliseconds) {
    return rx.Observable.create(
            subscriber -> {
                try {
                    Thread.sleep(delayMilliseconds); //simulate long operation
                    subscriber.onNext("response(" + url + ") ");
                    subscriber.onCompleted();
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
            }
    );
}

public void callServices() {
    rx.Observable<String> call1 = externalCall("url1", 1000).subscribeOn(Schedulers.newThread());
    rx.Observable<String> call2 = externalCall("url2", 4000).subscribeOn(Schedulers.newThread());
    rx.Observable<String> call3 = externalCall("url3", 5000).subscribeOn(Schedulers.newThread());
    rx.Observable.zip(call1, call2, call3, (resp1, resp2, resp3) -> resp1 + resp2 + resp3)
            .subscribeOn(Schedulers.newThread())
            .subscribe(response -> System.out.println("done with: " + response));
}
Run Code Online (Sandbox Code Playgroud)

所有对外部服务的请求都将在不同的线程中执行,当最后一次调用完成时,将应用转换函数(在示例中简单的字符串连接),结果(连接字符串)将从'zip'可观察到.


Vik*_*yap 6

我理解你的问题是你有一个预定义的异步方法,你尝试做的是使用RestTemplate类异步调用这个方法.

我写了一个方法,可以帮助你异步调用你的方法.

 public void testMyAsynchronousMethod(String... args) throws Exception {
        // Start the clock
        long start = System.currentTimeMillis();

        // Kick of multiple, asynchronous lookups
        Future<String> future1 = asyncRestTemplate
        .getForEntity(url1, String.class);;
        Future<String> future2 = asyncRestTemplate
        .getForEntity(url2, String.class);
        Future<String> future3 = asyncRestTemplate
        .getForEntity(url3, String.class);

        // Wait until they are all done
        while (!(future1 .isDone() && future2.isDone() && future3.isDone())) {
            Thread.sleep(10); //10-millisecond pause between each check
        }

        // Print results, including elapsed time
        System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());
    }
Run Code Online (Sandbox Code Playgroud)


Kuv*_*dis 1

您可能想要使用CompletableFuture类(javadoc)。

  1. 将您的通话转换为CompletableFuture. 例如。

    final CompletableFuture<ResponseEntity<String>> cf = CompletableFuture.supplyAsync(() -> {
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    });
    
    Run Code Online (Sandbox Code Playgroud)
  2. 接下来CompletableFuture::allOf使用您新创建的 3 个可完成的 future 调用方法。

  3. join()对结果调用方法。解决生成的可完成未来后,您可以从在步骤 3 中创建的每个单独的可完成未来中获取结果。