标签: completable-future

使用 CompletableFuture 抛出已检查的异常

Stackoverflow 包含多个有关将检查异常与CompletableFuture.

这里有一些例子:

虽然一些答案暗示使用CompletableFuture.completeExceptionally()他们的方法会导致用户代码难以阅读。

我将利用这个空间提供一个替代解决方案,以提高可读性。

请注意,这个问题特定于 CompletableFuture。这使我们能够提供不更普遍地扩展到 lambda 表达式的解决方案。

java completable-future

5
推荐指数
1
解决办法
5028
查看次数

具有 CompletableFuture 的非阻塞异步 Jersey JAX-RS

我正在研究 Jersey,我在一本书中看到您可以使用 CompletableFuture (和 CompletitionStage)以非阻塞 IO 方式调用您的 API。

但是当我用Postman调用API时,我总是得到500。

如果我调试代码,我会发现这些方法被正确调用。

第一个 GET 方法是同步的并且可以正确工作。第二次和第三次返回错误500

我缺少什么?

@Path("/hello")
public class HelloController {

  @GET
  @Path("/first")
  @Produces(MediaType.TEXT_PLAIN)
  public String first() {
    return "It works";
  }

  @GET
  @Path("/second")
  @Produces(MediaType.TEXT_PLAIN)
  public CompletionStage<Response> second() {
    return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
  }

  @GET
  @Path("/third")
  @Produces(MediaType.TEXT_PLAIN)
  public CompletableFuture<Response> third() {
    return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
  }
}
Run Code Online (Sandbox Code Playgroud)

java asynchronous jax-rs jersey completable-future

5
推荐指数
1
解决办法
3061
查看次数

CompletableFuture 对象数组的 Null 检查

我正在将 CompletableFuture 用于我的一项服务,如下 -

CompletableFuture<Employee>[] employeeDetails =
        empIds.stream().map(empId ->
            employeeService.employeeDetails(Integer.valueOf(empId))).toArray(CompletableFuture[]::new);
Run Code Online (Sandbox Code Playgroud)

EmployeeService 内部调用返回员工详细信息的 API。

问题是,当该 API 返回 null 或任何异常时,响应将为 null。当我检查 null 时,即使employeeDetails 数组为 null 并且其值也为 null 并且我得到 Null 指针,它也始终为 false。

我正在检查 null 作为 -

if(employeeDetails != null && employeeDetails.length > 0){
   //This condition always true even if null values or null array.
   CompletableFuture<Void> allEmployeeDetails = CompletableFuture.allOf(employeeDetails); // Here I am getting NullPointerException
}
Run Code Online (Sandbox Code Playgroud)

我在这里犯了任何错误,或者 CompletableFuture 数组需要处理任何特殊检查。

java java-8 completable-future

5
推荐指数
1
解决办法
8431
查看次数

下面CompletableFuture示例中join的调用是否会阻塞进程

我试图理解 CompletableFutures 和返回已完成的 future 的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。

第一个方法应该提供一个包含 userId 列表的完整 future,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中为字符串)。

总结一下:

  1. 获取 id
  2. 获取与这些 id 对应的用户列表。

我创建了简单的方法来模拟睡眠线程的响应。请检查下面的代码

public class PipelineOfTasksExample {

    private Map<Long, String> db = new HashMap<>();

    PipelineOfTasksExample() {
        db.put(1L, "user1");
        db.put(2L, "user2");
        db.put(3L, "user3");
        db.put(4L, "user4");
    }


    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
    }

    private CompletableFuture<String> fetchById(Long id) {
        CompletableFuture<String> …
Run Code Online (Sandbox Code Playgroud)

java nonblocking completable-future

5
推荐指数
1
解决办法
6662
查看次数

可完成的未来。处理业务“异常”的最佳方法是什么?

我刚刚开始熟悉 Java 的 CompletableFuture 工具。我创建了一个小玩具应用程序来模拟几乎所有开发人员都会遇到的一些经常性用例。

在这个例子中,我只想将一个东西保存在数据库中,但在这样做之前我想检查该东西是否已经保存。

如果该事物已经在数据库中,则流程(可完成的未来链)应该停止并且不保存该事物。我正在做的是抛出一个异常,这样最终我就可以处理它并向服务的客户端提供一个好的消息,以便他知道发生了什么。

这是我到目前为止所尝试过的:

首先是尝试保存事物的代码,如果事物已在表中,则抛出错误:

repository
        .query(thing.getId())
        .thenCompose(
            mayBeThing -> {
              if (mayBeThing.isDefined()) throw new CompletionException(new ThingAlreadyExists());
              else return repository.insert(new ThingDTO(thing.getId(), thing.getName()));
Run Code Online (Sandbox Code Playgroud)

这是我正在尝试运行的测试:

    CompletableFuture<Integer> eventuallyMayBeThing =
        service.save(thing).thenCompose(i -> service.save(thing));
    try {
      eventuallyMayBeThing.get();
    } catch (CompletionException ce) {
      System.out.println("Completion exception " + ce.getMessage());
      try {
        throw ce.getCause();
      } catch (ThingAlreadyExist tae) {
        assert (true);
      } catch (Throwable t) {
        throw new AssertionError(t);
      }
    }
Run Code Online (Sandbox Code Playgroud)

我从这个响应中采用了这种方式:从 CompletableFuture 抛出异常(投票最多的答案的第一部分)。

然而,这是行不通的。确实被抛出ThingAlreadyExist,但它从未被我的 try catch 块处理。我的意思是,这个:

catch (CompletionException …
Run Code Online (Sandbox Code Playgroud)

java completable-future

5
推荐指数
1
解决办法
1683
查看次数

在 Spring Boot 中使用 @Async 注解中的 CompletableFuture.supplyAsync 和 CompletableFuture.completedFuture

我有以下方法:

@EnableAsync
@Service
Class MyService{ 

private String processRequest() {
        log.info("Start processing request");

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        log.info("Completed processing request");
        return RESULT;
    }    

@Async
public CompletableFuture<String> getSupplyAsyncResult(){
    CompletableFuture<String> future
            = CompletableFuture.supplyAsync(this::processRequest);
    return future;
}

@Async
public CompletableFuture<String> getCompletedFutureResult(){
    CompletableFuture<String> future
            = CompletableFuture.supplyAsync(this::processRequest);
    return future;
}
Run Code Online (Sandbox Code Playgroud)

以及控制器中的以下端点:

   @RequestMapping(path = "/asyncSupplyAsync", method = RequestMethod.GET)
    public CompletableFuture<String> getValueAsyncUsingCompletableFuture() {
        log.info("Request received");
        CompletableFuture<String> completableFuture
                = myService.getSupplyAsyncResult();
        log.info("Servlet thread released");
        return completableFuture;
    }
Run Code Online (Sandbox Code Playgroud)

   @RequestMapping(path = "/asyncCompletable", method …
Run Code Online (Sandbox Code Playgroud)

java asynchronous future spring-boot completable-future

5
推荐指数
1
解决办法
4970
查看次数

缩减列表&lt;CompletableFuture&lt;T&gt;&gt;

何时ints给出:

List<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

使用Java Stream API,我们可以减少它们

MyValue myvalue = ints
        .parallelStream()
        .map(x -> toMyValue(x))
        .reduce((t, t2) -> t.combine(t2))
        .get();
Run Code Online (Sandbox Code Playgroud)

在这个例子中,对我来说重要的是......

  • 项目将在多个线程中减少
  • 早期映射的项目将提前减少
  • 并非所有结果toMyValue()都会同时加载

现在我想通过API做同样的处理CompletableFuture

为了做地图,我做了:

List<CompeletableFuture<MyValue>> myValueFutures = ints
        .stream()
        .map(x -> CompletableFuture.supplyAsync(() -> toMyValue(x), MY_THREAD_POOL))
        .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

现在我不知道如何减少List<CompeletableFuture<MyValue>> myValueFutures单身MyValue

并行流提供了方便的 API,但由于这些问题我不想使用 Stream API:

  • 并行流在处理过程中很难停止阶段。
  • 当某些worker被IO阻塞时,并行流的活动worker计数可能会超过并行度。这有助于最大限度地提高 CPU 利用率,但可能会出现内存开销(甚至 OOM)。

有什么办法可以减少 CompetableFutures 吗?没有流减少API的一一?

java java-stream completable-future

5
推荐指数
1
解决办法
1917
查看次数

如何使用 CompletableFuture 对 ArrayList 中的一批元素执行操作?

Java版本:11

我有对象列表,我想对它们执行某些操作,其中一个操作取决于另一个操作的结果。为了以异步方式实现此工作流程,我正在使用CompletableFuture.

目前,我正在通过将列表分区为子列表并将每个子列表赋予 来实现此目的CompletableFuture,因此线程池中的每个线程都可以开始处理该子列表。

我使用和工作的上述方法的代码是:

List<SomeObject> someObjectList // original list

List<List<SomeObject>> partitionList = ListUtils.partition(someObjectList, partionSize);

partitionList.forEach(subList -> {
    CompletableFuture.supplyAsync(() ->  firstOperation(subList), executorService)
            .thenAcceptAsync(firstOpresult -> secondOperationWithFirstOpResult(firstOpresult),executorService);
});

public static List<String> firstOperation(List<SomeObject> subList){
        //perform operation
        return List<String>;
}

public static void secondOperationWithFirstOpResult(List<String> firstOpProducedList) {
        //perform operation
        //print results.
}

Run Code Online (Sandbox Code Playgroud)

这里的问题是对原始列表进行分区,

因为如果我的原始列表有 10 万条记录,分区大小为 100(这意味着我希望每个子列表中有 100 个项目),我将有 1000 个子列表对象,每个对象包含 100 个元素,考虑到内存中有这么多对象,这可能不太好,此外,如果分区大小是用户控制/配置控制器,则较小的分区大小将导致子列表对象数量过多。

而不是对原始列表进行分区,

  1. 我想要原始列表。(比如说 100 个元素)
  2. 在原始列表上有一个 startIndex 和 endIndex(比如 0 到 9、10 到 19...)
  3. 并将每个批次分配给一个线程,在带有 CompletableFuture 的线程池中
  4. 所以这个线程可以执行这两个操作。 …

java multithreading java-8 completable-future

5
推荐指数
1
解决办法
736
查看次数

对可完成的未来的测试总是过去

我有以下测试应始终失败:

@Test
public void testCompletable() {
    CompletableFuture.completedFuture(0)
        .thenAccept(a -> {
            org.junit.Assert.assertTrue(1==0);
        });
}
Run Code Online (Sandbox Code Playgroud)

而这项测试总是成功的.如何才能使此测试失败?

java junit java-8 completable-future

4
推荐指数
1
解决办法
929
查看次数

如何捕获CompletableFuture的whenCompleteAsync调用中抛出的RejectedExecutionException?

下面的示例代码我正在注入一个biconsumer睡眠时间为100毫秒,作为一组可完成的未来的完成动作.我whenCompleteAsync通过单独executorService使用来使用方法.executorService是一个ThreadPoolExecutor与芯池大小5,最大尺寸5和1队列的长度.

public class CompleteTest {
    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

        ArrayList<CompletableFuture<String>> list = new ArrayList<>();

        for (int i = 0; i <100; i++) {
            CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
            stringCompletableFuture.whenCompleteAsync((e, a) -> {
                System.out.println("Complete " + e);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e1) {e1.printStackTrace();}
            }, executorService);

            list.add(stringCompletableFuture);
        }

        for (int i = 0; i < list.size(); i++) { …
Run Code Online (Sandbox Code Playgroud)

java future executorservice java-8 completable-future

4
推荐指数
1
解决办法
941
查看次数