标签: completable-future

使用 Java 8 流和 CompletableFuture 的并行数据库调用

我想用 Java 8 流复制和并行化以下行为:

for (animal : animalList) {
        // find all other animals with the same breed
        Collection<Animal> queryResult = queryDatabase(animal.getBreed());

        if (animal.getSpecie() == cat) {
            catList.addAll(queryResult);
        } else {
            dogList.addAll(queryResult);
        }
}
Run Code Online (Sandbox Code Playgroud)

这是我到目前为止

final Executor queryExecutor =
        Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
                new ThreadFactory(){
                    public Thread newThread(Runnable r){
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        return t;
                    }
                });

List<CompletableFuture<Collection<Animal>>> listFutureResult =  animalList.stream()
        .map(animal -> CompletableFuture.supplyAsync(
                () -> queryDatabase(animal.getBreed()), queryExecutor))
        .collect(Collectors.toList());

List<Animal> = listFutureResult.stream()
        .map(CompletableFuture::join)
        .flatMap(subList -> subList.stream())
        .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

1 - 我不知道如何拆分流,以便获得 …

java java-stream completable-future

2
推荐指数
1
解决办法
6985
查看次数

调用supplyAsync时尝试并捕获

我是CompletableFuture的新手,我想调用MetadataLoginUtil :: login方法,它可以抛出异常.但是,尽管我已经"特别"编写,但下面的代码并未编译.它说我必须在try&catch中包装MetadataLoginUtil :: login'.

请指教.谢谢!

public void run() throws ConnectionException {
    CompletableFuture<Void> mt = CompletableFuture.supplyAsync(MetadataLoginUtil::login)
            .exceptionally(e -> {
                System.out.println(e);
                return null;
            })
            .thenAccept(e -> System.out.println(e));
}
Run Code Online (Sandbox Code Playgroud)

java java-8 completable-future

2
推荐指数
1
解决办法
3302
查看次数

Java 8 CompletableFuture从声明中返回一个子类型

抱歉这个糟糕的头衔,如果有人有更好的主意,我愿意接受建议.

我正在玩这个CompletableFuture,我偶然发现了一些奇怪的东西.

假设你有两个类:AB哪里B extends A,B也是一个子类型A.

现在,让我们宣布一个CompletableFuture:

CompletableFuture<A> promiseofA = CompletableFuture.supplyAsync(() -> new B());
Run Code Online (Sandbox Code Playgroud)

这是有效的,因为B它的子类型A符合声明CompletableFuture.现在,如果我想添加一个exceptionally步骤,那么我有一个编译异常:

CompletableFuture<A> promiseOfA = CompletableFuture.supplyAsync(() -> new B())
                                                   .exceptionally(ex -> new B());
Run Code Online (Sandbox Code Playgroud)

在这种情况下,Java正在抱怨,说:

Compilation error[ java.util.concurrent.CompletableFuture<B> cannot be converted to java.util.concurrent.CompletableFuture<A>]
Run Code Online (Sandbox Code Playgroud)

为什么没有这个excepionally步骤而没有它呢?

java completable-future

2
推荐指数
1
解决办法
182
查看次数

CompletableFuture:如何将函数应用于多个CompletableFutures?

假设我有3个下载,框架为可完成的期货:

    CompletableFuture<Doc> dl1 = CompletableFuture.supplyAsync(() -> download("file1"));
    CompletableFuture<Doc> dl2 = CompletableFuture.supplyAsync(() -> download("file2"));
    CompletableFuture<Doc> dl3 = CompletableFuture.supplyAsync(() -> download("file3"));
Run Code Online (Sandbox Code Playgroud)

然后所有这些都需要以相同的方式处理

    CompletableFuture<String> s1 = dl1.thenApply(Doc::getFilename);
    CompletableFuture<String> s2 = dl2.thenApply(Doc::getFilename);
    CompletableFuture<String> s3 = dl3.thenApply(Doc::getFilename);
Run Code Online (Sandbox Code Playgroud)

您可以想象要应用的多个功能,所有功能都是并行的.

根据DRY原则,这个例子似乎不合适.所以我正在寻找一种解决方案来定义仅执行3次并行执行的工作流程.

如何实现这一目标?

我尝试过allOf,但这有两个问题:1)它开始阻塞,2)返回类型只能run填充而不是处理它.

java concurrency java.util.concurrent java-8 completable-future

2
推荐指数
1
解决办法
965
查看次数

如何防止 CompletableFuture#whenComplete 在上下文线程中执行

我有以下代码:

ConcurrentHashMap taskMap= new ConcurrentHashMap();
....
taskMap.compute(key, (k, queue) -> {
        CompletableFuture<Void> future = (queue == null)
                ? CompletableFuture.runAsync(myTask, poolExecutor)
                : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
        //to prevent OutOfMemoryError in case if we will have too much keys
        future.whenComplete((r, e) -> taskMap.remove(key, future));            
        return future;
    });
Run Code Online (Sandbox Code Playgroud)

如果future已经完成的whenComplete函数参数在与调用相同的线程中compute调用,则此代码的问题。在此方法的主体中,我们从地图中删除条目。但是计算方法文档禁止这样做并且应用程序冻结。

我该如何解决这个问题?

java concurrency multithreading concurrenthashmap completable-future

2
推荐指数
1
解决办法
4132
查看次数

CompletableFuture | 然后应用Async vs thenCompose及其用例

我试图理解CompletableFuture,并遇到了2个方法,然后是ApplyAsync然后是Make.我试图了解这两者之间的区别.

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " Printing hello");
    return "Hello";
}).thenCompose((String s) -> {
  return CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread().getName() + " Adding abc");
      return "abc "+s;});
}).thenApplyAsync((String s) -> {
    System.out.println(Thread.currentThread().getName() + " Adding world");
    return s + " World";
}).thenApplyAsync((String s) -> {
    System.out.println(Thread.currentThread().getName() + " Adding name");
    if (false) {
       throw new RuntimeException("Oh no exception");
    }
    return s + " player!";
}).handle((String s, Throwable t) -> {
    System.out.println(s != null ? s : "BLANK"); …
Run Code Online (Sandbox Code Playgroud)

java java-8 completable-future

2
推荐指数
1
解决办法
3260
查看次数

Clojure用超时实现了期货的向量

我正在与返回a的Java API接口CompletableFuture.现在,如果我有这些数组[cf1 cf2 cf3 …],我怎么能给予他们所有一秒完成,并收集不论他们在一秒钟后产生的?

就像是:

  (def vec-of-cf [cf1 cf2 cf3])

  (get-all vec-of-cf 1000 ::timeout)

  ;; no more than 1 second later, I should have my vector of realized CompletableFuture, possibly holding a `::timeout` value if they did not have time to finish
Run Code Online (Sandbox Code Playgroud)

我认为这类似于Scala flatmap(?).

java asynchronous clojure completable-future

2
推荐指数
1
解决办法
148
查看次数

CompletableFuture.allOf()在单个期货之后未完成

如javadoc中所述,当我使用CompletableFuture.allOf()组合独立的可完成期货时,在将所有期货提供给该方法之后,它不能可靠地完成。例如:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable dummyTask = () -> {
            try {
                Thread.sleep(200);
            } catch (InterruptedException ignored) {
            }
        };

        CompletableFuture<Void> f1 = CompletableFuture.runAsync(dummyTask);
        CompletableFuture<Void> f2 = CompletableFuture.runAsync(dummyTask);
        CompletableFuture[] all = {f1, f2};
        f1.whenComplete((aVoid, throwable) -> System.out.println("Completed f1"));
        f2.whenComplete((aVoid, throwable) -> System.out.println("Completed f2"));
        CompletableFuture<Void> allOf = CompletableFuture.allOf(all);
        allOf.whenComplete((aVoid, throwable) -> {
                    System.out.println("Completed allOf");
                }
        );
        allOf.join();
        System.out.println("Joined");
    }
}
Run Code Online (Sandbox Code Playgroud)

导致以下结果:

Completed f2
Joined
Completed allOf
Completed …
Run Code Online (Sandbox Code Playgroud)

java completable-future

2
推荐指数
1
解决办法
4031
查看次数

Completable Future发布什么时候会回线到线程池?

何时将CompletableFuture线程发回ThreadPool?是在调用get()方法之后还是在完成相关任务之后?

java threadpool java-8 completable-future

2
推荐指数
1
解决办法
383
查看次数

为什么我的 Thread.sleep 在 CompletableFuture 中结束我的进程?

我正在尝试创建一个 CompleteableFuture。我只是想注销一些语句。这是我的代码:

    static CompletableFuture<String> createFuture(String name) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("Task execution started.");
                //Thread.sleep(2000);
                System.out.println("Task execution stopped.");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return name;
        });
    }

    static void start(Person person, List<Person> people) {
        CompletableFuture.allOf(
            createFuture("Bob")
        ).thenApply(s -> {
            return s;
        }).exceptionally(e -> {
            System.out.println(e);
            return null;
        });
    }
Run Code Online (Sandbox Code Playgroud)

一切正常,直到我取消注释Thread.sleep(2000)。取消注释时,进程将终止。它不会进入catch,也不会进入exceptionally。为什么?我错过了什么?我如何使这项工作?

java completable-future

2
推荐指数
1
解决办法
282
查看次数