标签: completable-future

与 CompletablFuture 和 ExecutorService 并行调用方法

我正在尝试对中的getPrice每个方法进行并行调用。我有这段代码并验证了 getPrice 正在单独的线程中运行,但它们是按顺序运行的,而不是并行运行的。谁能指出我在这里缺少什么吗?productproducts

非常感谢你的帮助。

   ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
   Set<Product> decoratedProductSet = products.stream()
   .map(product -> CompletableFuture
                             .supplyAsync(() -> getPrice(product.getId(), date, context), service))
   .map(t -> t.exceptionally(throwable -> null))
   .map(t -> t.join())
   .collect(Collectors.<Product>toSet());
Run Code Online (Sandbox Code Playgroud)

executorservice java-8 completable-future

2
推荐指数
1
解决办法
519
查看次数

使用 CompletionStage 而不是 CompletableFuture

给出以下方法:

private static String getChuckNorrisJoke () {
    try {
        HttpURLConnection con = (HttpURLConnection) new
        URL( "http://api.icndb.com/jokes/random" ).openConnection();
        BufferedReader in = new BufferedReader( new InputStreamReader(con.getInputStream()));
        StringBuilder response = new StringBuilder();
        String line;
        while ((line = in.readLine()) != null ) {
            response.append(line);
        }
        in.close();
        return response.toString();
    } catch (IOException e) {
        throw new IllegalStateException( "Something is wrong: " , e);
    }
}
Run Code Online (Sandbox Code Playgroud)

以下语句可用于以异步方式运行该方法。

final CompletableFuture<String> jokeAsync = CompletableFuture.supplyAsync(() -> getChuckNorrisJoke());
Run Code Online (Sandbox Code Playgroud)

尽管我认为我理解CompletionStage与 的关系CompletableFuture,但我不确定如何使用它CompletionStage来完成相同的任务。

final CompletionStage<String> jokeAsync = …
Run Code Online (Sandbox Code Playgroud)

java lambda completable-future completion-stage

2
推荐指数
1
解决办法
4410
查看次数

Java 8 Completable Future - 并行执行

我正在测试如何CompletableFuture工作。我对如何并行执行任务感兴趣:

try {
          CompletableFuture one = CompletableFuture.runAsync(() -> {
          throw new RuntimeException("error");
          });
          CompletableFuture two = CompletableFuture.runAsync(() -> System.out.println("2"));
          CompletableFuture three = CompletableFuture.runAsync(() -> System.out.println("3"));
          CompletableFuture all = CompletableFuture.allOf(one, two, three);
          all.get();
} catch (InterruptedException e) {
           System.out.println(e);
} catch (ExecutionException e) {
           System.out.println(e);
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,他们将全部被处决。

1 . 当其中一个线程出现异常时,是否可以中断所有正在运行的线程?

2 . 当此代码位于可以从不同线程调用的类方法内部时,它是线程安全的吗?

java multithreading java-8 completable-future

2
推荐指数
1
解决办法
4033
查看次数

.exceptionally() 会捕获从嵌套 future 中抛出的异常吗?或者哪里是正确的放置 .exceptionally()

foo.thenCompose(fooResponse -> {
  ...
  return bar.thenCompose(barResponse -> {
    ...
  });
}).exceptionally(e -> {
  ...
});
Run Code Online (Sandbox Code Playgroud)

这也会.exceptionally()捕获从嵌套bar.thenComposelambda 内部抛出的异常吗?或者我需要这样写:

foo.thenCompose(fooResponse -> {
  ...
  return bar.thenCompose(barResponse -> {
    ...
  }).exceptionally(nestedE -> {
    ...
  });
}).exceptionally(e -> {
  ...
});
Run Code Online (Sandbox Code Playgroud)

然后又吐了?

java future exception java-8 completable-future

2
推荐指数
1
解决办法
839
查看次数

返回一个包含 CompletableFuture 列表的 CompletableFuture

我正在尝试加快对多个 API 的调用。

下面的代码中,getFilteredEvents是当前的同步版本。我有一种感觉,该map(x -> x.getFilteredEvents(eventResearch))操作将等待每个 API(它RestTemplate.exchange()在内部使用)的响应,然后再传递到下一个 API 以构建List<Event>我想要返回的 API。解决方案可能是map在单独的线程上启动调用,但我想尝试一下CompletableFutureAPI。

因此,这getFilteredEventsFaster是我努力改善响应时间的结果。

@Service
public class EventsResearchService {

    @Autowired
    private List<UniformEventsResearchApi> eventsResearchApis;

    // this works, but I'm trying to improve it
    public EventResearchResponse getFilteredEvents(EventResearch eventResearch) {
        List<Event> eventsList = eventsResearchApis
                .stream()
                .map(x -> x.getFilteredEvents(eventResearch))
                .flatMap(List::stream)
                .collect(Collectors.toList());

        return extractResponse(eventResearch, eventsList);
    }

    // this doesn't work yet: what is wrong?
    public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
        List<CompletableFuture<List<Event>>> futureEventsList = eventsResearchApis
                .parallelStream() …
Run Code Online (Sandbox Code Playgroud)

java spring spring-mvc spring-boot completable-future

2
推荐指数
1
解决办法
7778
查看次数

SupplyAsync 等待所有 CompletableFutures 完成

我正在下面运行一些异步任务,需要等待它们全部完成。我不知道为什么,但它join()并没有强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?

CompletableFutures列表只是一个映射supplyAsync的流

List<Integer> items = Arrays.asList(1, 2, 3);

List<CompletableFuture<Integer>> futures = items
                .stream()
                .map(item -> CompletableFuture.supplyAsync(() ->  {

                    System.out.println("processing");
                    // do some processing here
                    return item;

                }))
                .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

我等待期货之类的。

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(ignored -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)

我可以等待,futures.forEach(CompletableFuture::join);但我想知道为什么我的流方法不起作用。

java asynchronous java-8 completable-future

2
推荐指数
1
解决办法
8604
查看次数

取消已经运行的 CompletableFutures 的预期模式是什么

我在“Java 8 in Action”中找不到任何关于为什么CompletableFuture故意忽略mayInterruptIfRunning. 但即便如此,我也没有真正看到 custom 的任何钩子cancel(boolean),这在中断不影响阻塞操作(例如 I/O 流、简单锁等)的情况下会派上用场。到目前为止,任务本身似乎是预期的钩子,在抽象级别上工作在Future这里没有任何好处。

因此,我要问的是为了从这种情况中挤出一些 neet 自定义取消机制而必须引入的最少样板代码集。

java completable-future

2
推荐指数
1
解决办法
123
查看次数

如何有效地使用 CompletableFuture 来映射每个输入的异步任务

我想返回包含所有键到值的映射的映射,这些值是 API 对这些键的响应。我正在使用CompletableFutureandGuava为此。以下是我的尝试。是否有其他标准方法可以使用 Java 8 和线程 API 实现相同的效果?

地图之中id -> apiResponse(id)

    
    public static List<String> returnAPIResponse(Integer key) {
        return Lists.newArrayList(key.toString() + " Test");
    }

    public static void main(String[] args) {
        List<Integer> keys = Lists.newArrayList(1, 2, 3, 4);

        List<CompletableFuture<SimpleEntry<Integer, List<String>>>> futures = keys
            .stream()
            .map(key -> CompletableFuture.supplyAsync(
                () -> new AbstractMap.SimpleEntry<>(key, returnAPIResponse(key))))
            .collect(Collectors.toList());

        System.out.println(
            futures.parallelStream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));

    }

Run Code Online (Sandbox Code Playgroud)

java multithreading asynchronous guava completable-future

2
推荐指数
1
解决办法
7504
查看次数

在 Spring Cloud Sleuth 中保持 CompletableFuture 的跟踪/跨度

我正在将 Sleuth 与CompletableFuture.handle. 例子:

log.info("first");
apnsClient.sendNotification(...).handle((response, cause) -> {
     log.info("second");
});
Run Code Online (Sandbox Code Playgroud)

我希望second日志具有与first日志相同的跟踪 ID。然而,事实并非如此。因此我想知道该怎么办?谢谢!

PS 我无法控制如何apnsClient.sendNotification管理线程(因为它来自Pushy),因此无法使用LazyTraceExecutor之类的东西。

java spring spring-cloud completable-future spring-cloud-sleuth

2
推荐指数
1
解决办法
2570
查看次数

有没有办法计算完成每个 completableFuture 所需的时间?

在详细讨论问题之前,我想先介绍一下问题的背景。基本上,我的代码看起来像

for(int i = 0; i < n; i++){
    startTime = System.currentTimeMillis();
    result = doSomething();
    endTime = System.currentTimeMillis();
    responseTime.add(endTime - startTime);
    results.add(result);
} 
print(results and responseTime);
Run Code Online (Sandbox Code Playgroud)

现在我想做的是将doSomething()作为 completableFuture 运行并获取上面提到的responseTime。但因为这样做是完全错误的——

for(int i = 0; i < n; i++){
    startTime = System.currentTimeMillis();
    resultCF = CompletableFuture.supplyasync(() -> doSomething());
    endTime = System.currentTimeMillis();
    responseTime.add(endTime - startTime);
    results.add(resultCF); //result is to store the completableFutures
}
CompletableFuture.allOf(results.toArray(new CompletableFuture[results.size()])).join();
print(i.get() for i in results and responseTimes);
Run Code Online (Sandbox Code Playgroud)

因为它会破坏获取每个doSomething()的执行时间的目的。那么,有什么方法可以获得每个 completableFuture 的响应时间吗?另外,我需要在循环末尾包含 completableFutures( resultCF ) 的结果数组列表。

java time asynchronous completable-future

2
推荐指数
1
解决办法
1866
查看次数