我想用 Java 8 流复制和并行化以下行为:
for (animal : animalList) {
// find all other animals with the same breed
Collection<Animal> queryResult = queryDatabase(animal.getBreed());
if (animal.getSpecie() == cat) {
catList.addAll(queryResult);
} else {
dogList.addAll(queryResult);
}
}
Run Code Online (Sandbox Code Playgroud)
这是我到目前为止
final Executor queryExecutor =
Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
new ThreadFactory(){
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
List<CompletableFuture<Collection<Animal>>> listFutureResult = animalList.stream()
.map(animal -> CompletableFuture.supplyAsync(
() -> queryDatabase(animal.getBreed()), queryExecutor))
.collect(Collectors.toList());
List<Animal> = listFutureResult.stream()
.map(CompletableFuture::join)
.flatMap(subList -> subList.stream())
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
1 - 我不知道如何拆分流,以便获得 …
我是CompletableFuture的新手,我想调用MetadataLoginUtil :: login方法,它可以抛出异常.但是,尽管我已经"特别"编写,但下面的代码并未编译.它说我必须在try&catch中包装MetadataLoginUtil :: login'.
请指教.谢谢!
public void run() throws ConnectionException {
CompletableFuture<Void> mt = CompletableFuture.supplyAsync(MetadataLoginUtil::login)
.exceptionally(e -> {
System.out.println(e);
return null;
})
.thenAccept(e -> System.out.println(e));
}
Run Code Online (Sandbox Code Playgroud) 抱歉这个糟糕的头衔,如果有人有更好的主意,我愿意接受建议.
我正在玩这个CompletableFuture,我偶然发现了一些奇怪的东西.
假设你有两个类:A和B哪里B extends A,B也是一个子类型A.
现在,让我们宣布一个CompletableFuture:
CompletableFuture<A> promiseofA = CompletableFuture.supplyAsync(() -> new B());
Run Code Online (Sandbox Code Playgroud)
这是有效的,因为B它的子类型A符合声明CompletableFuture.现在,如果我想添加一个exceptionally步骤,那么我有一个编译异常:
CompletableFuture<A> promiseOfA = CompletableFuture.supplyAsync(() -> new B())
.exceptionally(ex -> new B());
Run Code Online (Sandbox Code Playgroud)
在这种情况下,Java正在抱怨,说:
Compilation error[ java.util.concurrent.CompletableFuture<B> cannot be converted to java.util.concurrent.CompletableFuture<A>]
Run Code Online (Sandbox Code Playgroud)
为什么没有这个excepionally步骤而没有它呢?
假设我有3个下载,框架为可完成的期货:
CompletableFuture<Doc> dl1 = CompletableFuture.supplyAsync(() -> download("file1"));
CompletableFuture<Doc> dl2 = CompletableFuture.supplyAsync(() -> download("file2"));
CompletableFuture<Doc> dl3 = CompletableFuture.supplyAsync(() -> download("file3"));
Run Code Online (Sandbox Code Playgroud)
然后所有这些都需要以相同的方式处理
CompletableFuture<String> s1 = dl1.thenApply(Doc::getFilename);
CompletableFuture<String> s2 = dl2.thenApply(Doc::getFilename);
CompletableFuture<String> s3 = dl3.thenApply(Doc::getFilename);
Run Code Online (Sandbox Code Playgroud)
您可以想象要应用的多个功能,所有功能都是并行的.
根据DRY原则,这个例子似乎不合适.所以我正在寻找一种解决方案来定义仅执行3次并行执行的工作流程.
如何实现这一目标?
我尝试过allOf,但这有两个问题:1)它开始阻塞,2)返回类型只能run填充而不是处理它.
java concurrency java.util.concurrent java-8 completable-future
我有以下代码:
ConcurrentHashMap taskMap= new ConcurrentHashMap();
....
taskMap.compute(key, (k, queue) -> {
CompletableFuture<Void> future = (queue == null)
? CompletableFuture.runAsync(myTask, poolExecutor)
: queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
//to prevent OutOfMemoryError in case if we will have too much keys
future.whenComplete((r, e) -> taskMap.remove(key, future));
return future;
});
Run Code Online (Sandbox Code Playgroud)
如果future已经完成的whenComplete函数参数在与调用相同的线程中compute调用,则此代码的问题。在此方法的主体中,我们从地图中删除条目。但是计算方法文档禁止这样做并且应用程序冻结。
我该如何解决这个问题?
java concurrency multithreading concurrenthashmap completable-future
我试图理解CompletableFuture,并遇到了2个方法,然后是ApplyAsync然后是Make.我试图了解这两者之间的区别.
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " Printing hello");
return "Hello";
}).thenCompose((String s) -> {
return CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " Adding abc");
return "abc "+s;});
}).thenApplyAsync((String s) -> {
System.out.println(Thread.currentThread().getName() + " Adding world");
return s + " World";
}).thenApplyAsync((String s) -> {
System.out.println(Thread.currentThread().getName() + " Adding name");
if (false) {
throw new RuntimeException("Oh no exception");
}
return s + " player!";
}).handle((String s, Throwable t) -> {
System.out.println(s != null ? s : "BLANK"); …Run Code Online (Sandbox Code Playgroud) 我正在与返回a的Java API接口CompletableFuture.现在,如果我有这些数组[cf1 cf2 cf3 …],我怎么能给予他们所有一秒完成,并收集不论他们在一秒钟后产生的?
就像是:
(def vec-of-cf [cf1 cf2 cf3])
(get-all vec-of-cf 1000 ::timeout)
;; no more than 1 second later, I should have my vector of realized CompletableFuture, possibly holding a `::timeout` value if they did not have time to finish
Run Code Online (Sandbox Code Playgroud)
我认为这类似于Scala flatmap(?).
如javadoc中所述,当我使用CompletableFuture.allOf()组合独立的可完成期货时,在将所有期货提供给该方法之后,它不能可靠地完成。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable dummyTask = () -> {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
}
};
CompletableFuture<Void> f1 = CompletableFuture.runAsync(dummyTask);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(dummyTask);
CompletableFuture[] all = {f1, f2};
f1.whenComplete((aVoid, throwable) -> System.out.println("Completed f1"));
f2.whenComplete((aVoid, throwable) -> System.out.println("Completed f2"));
CompletableFuture<Void> allOf = CompletableFuture.allOf(all);
allOf.whenComplete((aVoid, throwable) -> {
System.out.println("Completed allOf");
}
);
allOf.join();
System.out.println("Joined");
}
}
Run Code Online (Sandbox Code Playgroud)
导致以下结果:
Completed f2
Joined
Completed allOf
Completed …Run Code Online (Sandbox Code Playgroud) 何时将CompletableFuture线程发回ThreadPool?是在调用get()方法之后还是在完成相关任务之后?
我正在尝试创建一个 CompleteableFuture。我只是想注销一些语句。这是我的代码:
static CompletableFuture<String> createFuture(String name) {
return CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Task execution started.");
//Thread.sleep(2000);
System.out.println("Task execution stopped.");
} catch (Exception e) {
e.printStackTrace();
}
return name;
});
}
static void start(Person person, List<Person> people) {
CompletableFuture.allOf(
createFuture("Bob")
).thenApply(s -> {
return s;
}).exceptionally(e -> {
System.out.println(e);
return null;
});
}
Run Code Online (Sandbox Code Playgroud)
一切正常,直到我取消注释Thread.sleep(2000)。取消注释时,进程将终止。它不会进入catch,也不会进入exceptionally。为什么?我错过了什么?我如何使这项工作?