CompletableFutureJDK 8中引入的内容与io.netty.util.concurrent.FutureNetty提供的内容相比如何?
Netty文档提到了这一点
JDK 8添加了CompletableFuture,它与http://netty.io/wiki/using-as-a-generic-library.html有些重叠
io.netty.util.concurrent.Future
我试图得到答案的问题是:
关于相似点/不同点,我能够提出以下建议:
相似之处: 基本的相似之处在于,与Java Future相比,两者都是非阻塞的.这两个类都有可用于向未来添加侦听器,内省失败和任务成功的方法,并从任务中获取结果.
差异:
CompletableFuture似乎有更丰富的界面来组合多个异步活动等.io.netty.util.concurrent.Future另一方面,Netty 允许将多个侦听器添加到同一个Future,而且允许删除侦听器.
CompletableFuture.allOf()我想实现和的混合,CompletableFuture.anyOf()其中一旦所有元素成功完成,返回的 future 就会成功完成,或者只要任何元素异常完成,它就会异常完成(具有相同的异常)。在多个元素失败的情况下,返回其中任何一个的异常就足够了。
我有一个任务需要聚合CompletableFutures 列表返回的子结果,但是一旦其中任何一个失败,该任务就应该停止等待。我知道子任务将继续运行,这没关系。
我发现等待未来的列表最初看起来像是一个重复的问题,但接受的答案使用CompletionService需要Callable或Runnable作为输入。我正在寻找一种将已运行的CompletionStages 作为输入的解决方案。
CompletableFuture在单独的线程上执行任务(使用线程池)并提供回调函数.假设我有一个API调用CompletableFuture.那是一个API调用阻塞吗?线程是否会被阻塞,直到它没有得到API的响应?(我知道主线程/ tomcat线程将是非阻塞的,但是CompletableFuture任务正在执行的线程呢?)
据我所知,单声道完全没有阻挡.
如果我错了,请详细说明并纠正我.
java reactive-programming project-reactor completable-future
我有这个:
Stream<CompletableFuture<List<Item>>>
Run Code Online (Sandbox Code Playgroud)
我怎样才能将它转换为
Stream<CompletableFuture<Item>>
Run Code Online (Sandbox Code Playgroud)
其中:第二个流由第一个流中每个列表内的每个项目组成。
我研究了一下thenCompose,但这解决了一个完全不同的问题,也称为“扁平化”。
如何以流方式高效地完成此操作,而不阻塞或过早消耗不必要的流项目?
这是迄今为止我最好的尝试:
ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;
@SuppressWarnings("unchecked")
CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
for(CompletableFuture<List<IncomingItem>> item: allFutures) {
queue.submit(item::get);
}
List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
queue.submit(() -> THE_END);
return THE_END;
});
queue.submit(() -> ender.get());
Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
boolean checkNext = true;
List<IncomingItem> next = null;
@Override
public boolean hasNext() {
if(checkNext) {
try { …Run Code Online (Sandbox Code Playgroud) java parallel-processing concurrency java-stream completable-future
我意识到我希望我们的API的消费者不必处理异常.或者更清楚一点,我想确保始终记录异常,但只有消费者才知道如何处理成功.我希望客户端能够处理异常,如果他们想要的话,File我无法返回它们.
注意:FileDownload是一个Supplier<File>
@Override
public CompletableFuture<File> processDownload( final FileDownload fileDownload ) {
Objects.requireNonNull( fileDownload );
fileDownload.setDirectory( getTmpDirectoryPath() );
CompletableFuture<File> future = CompletableFuture.supplyAsync( fileDownload, executorService );
future... throwable -> {
if ( throwable != null ) {
logError( throwable );
}
...
return null; // client won't receive file.
} );
return future;
}
Run Code Online (Sandbox Code Playgroud)
我真的不明白这些CompletionStage东西.我使用exception或handle?我会回归原来的未来还是他们回来的未来?
使用Spring Async与只返回CompletableFuture自己的优势是什么?
I have been working on Java CompletableFuture lately and found , we should always use customized Threadpool. With it, I found two ways of passing threadpool to existing code. Like below
This is my ThreadPool in configuration file
@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
return new ThreadPoolTaskExecutor();
}
Run Code Online (Sandbox Code Playgroud)
1. Passing existingThreadPool in argument.
@Autowired
@Qualifier("commonThreadPool")
TaskExecutor existingThreadPool;
CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);
Run Code Online (Sandbox Code Playgroud)
2. Using async like below
@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}
Run Code Online (Sandbox Code Playgroud)
is there any …
我想在某个操作上提高后端 REST API 的性能,该操作按顺序轮询多个不同的外部 API 并收集它们的响应并将它们全部扁平化为一个响应列表。
最近刚刚了解了CompletableFutures,我决定试一试,并将该解决方案与仅将 my 更改stream为 a 的解决方案进行比较parallelStream。
这是用于基准测试的代码:
package com.alithya.platon;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class ConcurrentTest {
static final List<String> REST_APIS =
Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
MyTestUtil myTest = new MyTestUtil();
long millisBefore; // used to benchmark
@BeforeEach
void setUp() {
millisBefore = System.currentTimeMillis();
}
@AfterEach
void tearDown() {
System.out.printf("time taken …Run Code Online (Sandbox Code Playgroud) 我正在尝试解决多线程银行帐户问题*,而不使用锁,而是使用多版本并发控制。它正在工作。只是有点慢。我怎样才能加快速度?
(*) 我有 5 个用户,每个用户从 200 开始 - 每个人随机提取 100 并将 100 存入另一个用户拥有的另一个银行账户。我预计到运行结束时银行余额总计为 1000。不应损失或创造任何金钱。这部分适用于我下面的实现。
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
public class ConcurrentWithdrawer {
private Map<String, Integer> database = new HashMap<>();
private int transactionCount = 0;
private final List<Transaction> transactions = Collections.synchronizedList(new ArrayList<>());
public static void main(String[] args) {
try {
new ConcurrentWithdrawer().run();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static int getRandomNumberInRange(int min, int max) {
if (min >= …Run Code Online (Sandbox Code Playgroud) 我正在向用户返回以下响应
class FinalResponseDTO {
List<Service1ResponseDTO> service1ResponseDTO;
Long totalCount;
List<Service2ResponseDTO> service2ResponseDTO;
}
Run Code Online (Sandbox Code Playgroud)
截至目前,我正在进行三个连续调用来计算 this FinalResponseDTO,每个调用都可以独立于其他调用运行。我尝试制作三种不同CompletableFuture的:
CompletableFuture<List<Service1ResponseDTO> future1 = CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<Service2ResponseDTO> future3 = CompletableFuture.supplyAsync(() -> service3.callMethod3());
Run Code Online (Sandbox Code Playgroud)
如果我这样做CompletableFuture.allOf(future1, future2, future3).join();或我应该打电话吗CompletableFuture.allOf(future1, future2, future3).get();?即使我调用其中任何一个join,或者get我应该如何FinalResponseDTO从中构建。我对 Java 8 并发功能不熟悉,例如,CompletableFuture我很困惑,因为每个 future 的返回类型都不同,我应该如何获得所有这些 future 的组合响应,然后构造我的最终输出?
java ×10
java-8 ×3
asynchronous ×2
concurrency ×2
java-stream ×2
benchmarking ×1
netty ×1
nonblocking ×1
spring ×1
spring-boot ×1