标签: completable-future

Java 8 CompletableFuture与Netty Future

CompletableFutureJDK 8中引入的内容与io.netty.util.concurrent.FutureNetty提供的内容相比如何?

Netty文档提到了这一点

JDK 8添加了CompletableFuture,它与http://netty.io/wiki/using-as-a-generic-library.html有些重叠 io.netty.util.concurrent.Future

我试图得到答案的问题是:

  1. 他们的异同是什么?
  2. 两者的性能特征如何不同?哪一个能够更好地扩展?

关于相似点/不同点,我能够提出以下建议:

相似之处: 基本的相似之处在于,与Java Future相比,两者都是非阻塞的.这两个类都有可用于向未来添加侦听器,内省失败和任务成功的方法,并从任务中获取结果.

差异: CompletableFuture似乎有更丰富的界面来组合多个异步活动等.io.netty.util.concurrent.Future另一方面,Netty 允许将多个侦听器添加到同一个Future,而且允许删除侦听器.

java nonblocking netty java-8 completable-future

10
推荐指数
1
解决办法
2302
查看次数

如何实现 CompletableFuture.allOf() 在任何 future 失败时异常完成?

CompletableFuture.allOf()我想实现和的混合,CompletableFuture.anyOf()其中一旦所有元素成功完成,返回的 future 就会成功完成,或者只要任何元素异常完成,它就会异常完成(具有相同的异常)。在多个元素失败的情况下,返回其中任何一个的异常就足够了。

用例

我有一个任务需要聚合CompletableFutures 列表返回的子结果,但是一旦其中任何一个失败,该任务就应该停止等待。我知道子任务将继续运行,这没关系。

相关问题

我发现等待未来的列表最初看起来像是一个重复的问题,但接受的答案使用CompletionService需要CallableRunnable作为输入。我正在寻找一种将已运行的CompletionStages 作为输入的解决方案。

java completable-future

10
推荐指数
1
解决办法
7575
查看次数

Mono vs CompletableFuture

CompletableFuture在单独的线程上执行任务(使用线程池)并提供回调函数.假设我有一个API调用CompletableFuture.那是一个API调用阻塞吗?线程是否会被阻塞,直到它没有得到API的响应?(我知道主线程/ tomcat线程将是非阻塞的,但是CompletableFuture任务正在执行的线程呢?)

据我所知,单声道完全没有阻挡.

如果我错了,请详细说明并纠正我.

java reactive-programming project-reactor completable-future

10
推荐指数
2
解决办法
1735
查看次数

如何在可完成的 future 流中展平列表?

我有这个:

Stream<CompletableFuture<List<Item>>>
Run Code Online (Sandbox Code Playgroud)

我怎样才能将它转换为

Stream<CompletableFuture<Item>>
Run Code Online (Sandbox Code Playgroud)

其中:第二个流由第一个流中每个列表内的每个项目组成。

我研究了一下thenCompose,但这解决了一个完全不同的问题,也称为“扁平化”。

如何以流方式高效地完成此操作,而不阻塞或过早消耗不必要的流项目?

这是迄今为止我最好的尝试:

    ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
    Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;

    @SuppressWarnings("unchecked")
    CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
    CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
    for(CompletableFuture<List<IncomingItem>> item: allFutures) {
        queue.submit(item::get);
    }
    List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
    CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
        queue.submit(() -> THE_END);
        return THE_END;
    });
    queue.submit(() -> ender.get());
    Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
        boolean checkNext = true;
        List<IncomingItem> next = null;
        @Override
        public boolean hasNext() {
            if(checkNext) {
                try { …
Run Code Online (Sandbox Code Playgroud)

java parallel-processing concurrency java-stream completable-future

10
推荐指数
1
解决办法
1595
查看次数

CompletableFuture的单独异常处理

我意识到我希望我们的API的消费者不必处理异常.或者更清楚一点,我想确保始终记录异常,但只有消费者才知道如何处理成功.我希望客户端能够处理异常,如果他们想要的话,File我无法返回它们.

注意:FileDownload是一个Supplier<File>

@Override
public CompletableFuture<File> processDownload( final FileDownload fileDownload ) {
    Objects.requireNonNull( fileDownload );
    fileDownload.setDirectory( getTmpDirectoryPath() );
    CompletableFuture<File> future = CompletableFuture.supplyAsync( fileDownload, executorService );
    future... throwable -> {
        if ( throwable != null ) {
            logError( throwable );
        }
        ...
        return null; // client won't receive file.
    } );
    return future;

}
Run Code Online (Sandbox Code Playgroud)

我真的不明白这些CompletionStage东西.我使用exceptionhandle?我会回归原来的未来还是他们回来的未来?

java java-8 completable-future

9
推荐指数
1
解决办法
9353
查看次数

9
推荐指数
2
解决办法
5404
查看次数

What are the ways to pass threadpoolexecutor to CompletableFuture?

I have been working on Java CompletableFuture lately and found , we should always use customized Threadpool. With it, I found two ways of passing threadpool to existing code. Like below

This is my ThreadPool in configuration file

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
  return new ThreadPoolTaskExecutor();
}
Run Code Online (Sandbox Code Playgroud)

1. Passing existingThreadPool in argument.

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);
Run Code Online (Sandbox Code Playgroud)

2. Using async like below

@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}
Run Code Online (Sandbox Code Playgroud)

is there any …

java multithreading spring-boot completable-future

9
推荐指数
1
解决办法
5605
查看次数

为什么`parallelStream` 比`CompletableFuture` 实现更快?

我想在某个操作上提高后端 REST API 的性能,该操作按顺序轮询多个不同的外部 API 并收集它们的响应并将它们全部扁平化为一个响应列表。

最近刚刚了解了CompletableFutures,我决定试一试,并将该解决方案与仅将 my 更改stream为 a 的解决方案进行比较parallelStream

这是用于基准测试的代码:

    package com.alithya.platon;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;


    public class ConcurrentTest {

        static final List<String> REST_APIS =
                Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
        MyTestUtil myTest = new MyTestUtil();
        long millisBefore; // used to benchmark

        @BeforeEach
        void setUp() {
            millisBefore = System.currentTimeMillis();
        }

        @AfterEach
        void tearDown() {
            System.out.printf("time taken …
Run Code Online (Sandbox Code Playgroud)

java benchmarking java-stream completable-future

9
推荐指数
1
解决办法
1161
查看次数

是否可以在没有锁的情况下快速解决多线程银行帐户问题?

我正在尝试解决多线程银行帐户问题*,而不使用锁,而是使用多版本并发控制。它正在工作。只是有点慢。我怎样才能加快速度?

(*) 我有 5 个用户,每个用户从 200 开始 - 每个人随机提取 100 并将 100 存入另一个用户拥有的另一个银行账户。我预计到运行结束时银行余额总计为 1000。不应损失或创造任何金钱。这部分适用于我下面的实现。

import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

public class ConcurrentWithdrawer {

    private Map<String, Integer> database = new HashMap<>();
    private int transactionCount = 0;
    private final List<Transaction> transactions = Collections.synchronizedList(new ArrayList<>());

    public static void main(String[] args) {
        try {
            new ConcurrentWithdrawer().run();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static int getRandomNumberInRange(int min, int max) {

        if (min >= …
Run Code Online (Sandbox Code Playgroud)

java concurrency multithreading completable-future

9
推荐指数
1
解决办法
341
查看次数

如何使用 CompletableFuture 并行运行多个服务调用?

我正在向用户返回以下响应

class FinalResponseDTO {
    List<Service1ResponseDTO> service1ResponseDTO;
    Long totalCount;
    List<Service2ResponseDTO> service2ResponseDTO;
}
Run Code Online (Sandbox Code Playgroud)

截至目前,我正在进行三个连续调用来计算 this FinalResponseDTO,每个调用都可以独立于其他调用运行。我尝试制作三种不同CompletableFuture的:

CompletableFuture<List<Service1ResponseDTO> future1 = CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<Service2ResponseDTO> future3 = CompletableFuture.supplyAsync(() -> service3.callMethod3());
Run Code Online (Sandbox Code Playgroud)

如果我这样做CompletableFuture.allOf(future1, future2, future3).join();或我应该打电话吗CompletableFuture.allOf(future1, future2, future3).get();?即使我调用其中任何一个join,或者get我应该如何FinalResponseDTO从中构建。我对 Java 8 并发功能不熟悉,例如,CompletableFuture我很困惑,因为每个 future 的返回类型都不同,我应该如何获得所有这些 future 的组合响应,然后构造我的最终输出?

java multithreading asynchronous completable-future

9
推荐指数
1
解决办法
1万
查看次数