如何杀死CompletableFuture相关的主题?

mas*_*ter 0 java multithreading future java-8 completable-future

我有检查CompletableFuture执行时间的方法.如果此类CompletableFuture执行的时间超过2秒,我想要终止此任务.但是,如果我没有执行CompletableFuture方法的控制线程,我该怎么办呢?

       final CompletableFuture<List<List<Student>>> responseFuture = new CompletableFuture<>();
responseFuture.supplyAsync(this::createAllRandomGroups)
        .thenAccept(this::printGroups)
        .exceptionally(throwable -> {
            throwable.printStackTrace();
            return null;
        });
Run Code Online (Sandbox Code Playgroud)

createAllRandomGroups()

private List<List<Student>> createAllRandomGroups() {
    System.out.println("XD");
    List<Student> allStudents = ClassGroupUtils.getActiveUsers();
    Controller controller = Controller.getInstance();
    List<List<Student>> groups = new ArrayList<>();
    int groupSize = Integer.valueOf(controller.getGroupSizeComboBox().getSelectionModel().getSelectedItem());
    int numberOfGroupsToGenerate = allStudents.size() / groupSize;
    int studentWithoutGroup = allStudents.size() % groupSize;
    if (studentWithoutGroup != 0) groups.add(this.getListOfStudentsWithoutGroup(allStudents, groupSize));
    for(int i = 0; i < numberOfGroupsToGenerate; i++) {
        boolean isGroupCreated = false;
        while (!isGroupCreated){
            Collections.shuffle(allStudents);
            List<Student> newGroup = this.createNewRandomGroupOfStudents(allStudents, groupSize);
            groups.add(newGroup);
            if (!DataManager.isNewGroupDuplicated(newGroup.toString())) {
                isGroupCreated = true;
                allStudents.removeAll(newGroup);
            }
        }
    }
    DataManager.saveGroupsToCache(groups);
    return groups;
}
Run Code Online (Sandbox Code Playgroud)

printGroups()

private void printGroups(List<List<Student>> lists) {
        System.out.println(lists);

    }
Run Code Online (Sandbox Code Playgroud)

此语句responseFuture.cancel(true);不会杀死responseFuture正在执行方法的线程.那么终止CompletableFuture线程最优雅的方法是什么?

Hol*_*ger 5

当您创建一系列CompletableFuture阶段时b = a.thenApply(function),这个方便的方法会创建不同组件的设置.基本上,这些组件互相引用a ? function ? b,因此完成a将触发评估,function其中将首先预先检查是否b仍未完成,然后评估您的功能并尝试完成b结果.

但是b它本身并不知道function或将评估它的线程.事实上,function并不是特别的b,任何人都可以打电话complete,completeExceptionallycancel从任何线程,第一个获胜.因此,completable在类名.

获得评估功能的线程的唯一方法是从一开始就控制它们,例如

ExecutorService myWorkers = Executors.newFixedThreadPool(2);

CompletableFuture<FinalResultType> future
    = CompletableFuture.supplyAsync(() -> generateInitialValue(), myWorkers)
                       .thenApplyAsync(v -> nextCalculation(v), myWorkers)
                       .thenApplyAsync(v -> lastCalculation(v), myWorkers);
future.whenComplete((x,y) -> myWorkers.shutdownNow());
Run Code Online (Sandbox Code Playgroud)

现在,future例如通过取消完成将确保该链不会触发新的评估,并进一步尝试中断正在进行的评估(如果有的话).

所以你可以实现超时,例如

try {
    try {
        FinalResultType result = future.get(2, TimeUnit.SECONDS);
        System.out.println("got "+result);
    }
    catch(TimeoutException ex) {
        if(future.cancel(true)) System.out.println("cancelled");
        else System.out.println("got "+future.get());
    }
}
catch(ExecutionException|InterruptedException ex) {
    ex.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)

并非由于线程池关闭而导致的任务拒绝可能导致某些中间期限永远不会完成,但对于这个阶段链,这是无关紧要的.重要的是,最后阶段future已经完成,这是有保证的,因为它的完成会触发关闭.