标签: completable-future

为什么thenComposeAsync等待返回可以兑换

我写了一个人为的代码示例,它可能不是某人应该使用的代码,但我相信它应该有效.然而它反而陷入僵局.我已经阅读了这里描述的答案,但发现它们不足.

这是代码示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String argv[]) throws Exception {

        int nThreads = 1;
        Executor executor = Executors.newFixedThreadPool( nThreads );



        CompletableFuture.completedFuture(true)
            .thenComposeAsync((unused)->{

                System.err.println("About to enqueue task");
                CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
                executor.execute(() -> {

                    // pretend this is some really expensive computation done asynchronously

                    System.err.println("Inner task");
                    innerFuture.complete(true);
                });
                System.err.println("Task enqueued");

                return innerFuture;
            }, executor).get();

        System.err.println("All done");
        System.exit(0);

    }

}
Run Code Online (Sandbox Code Playgroud)

这打印:

即将入队任务

任务排队

然后它挂了.它已陷入僵局,因为执行程序只有一个线程,并且它正在等待innerFuture变为可兑换.为什么它的返回值"thenComposeAsync"块成为可兑换的,而不是返回仍然不完整的未来,并在执行释放它的线程?

这感觉完全不直观,javadocs并没有真正帮助.我是否从根本上误解了CompletionStages的工作原理?或者这是实施中的错误?

java java-8 completable-future

6
推荐指数
1
解决办法
894
查看次数

在期货清单上流式传输的最有效方式

我通过流式传输对象列表来调用异步客户端方法.该方法返回Future.

迭代调用后返回的Futures列表的最佳方法是什么(以便处理那些首先出现的Future)?

注意:异步客户端仅返回Future not CompletableFuture.

以下是代码:

List<Future<Object>> listOfFuture = objectsToProcess.parallelStream()
    .map((object) -> {
        /* calling an async client returning a Future<Object> */ })
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

java concurrency future java-8 completable-future

6
推荐指数
1
解决办法
4363
查看次数

Spring 如何从返回 CompletableFuture 对象的端点获取结果?

在下面的代码中,当端点getPerson被命中时,响应将是一个 Person 类型的 JSON。Spring 如何转换CompletableFuture<Person>Person

@RestController
public class PersonController {

    @Autowired
    private PersonService personService;


    @GetMapping("/persons/{personId}" )
    public CompletableFuture<Person> getPerson(@PathVariable("personId") Integer personId) {

        return CompletableFuture.supplyAsync(() -> personService.getPerson(personId));
    }
}
Run Code Online (Sandbox Code Playgroud)

java spring spring-mvc java-8 completable-future

6
推荐指数
1
解决办法
2113
查看次数

在 Java 中从 C++ 复制延迟/异步启动策略

在 C++ 中,您可以使用延迟或异步启动策略启动线程。有没有办法在 Java 中复制此功能?

auto T1 = std::async(std::launch::deferred, doSomething());
auto T2 = std::async(std::launch::async, doSomething()); 
Run Code Online (Sandbox Code Playgroud)

每个的描述——

异步:

如果设置了 async 标志,则 async 在新的执行线程上执行可调用对象 f(所有线程局部初始化),除非函数 f 返回值或抛出异常,否则它会存储在可访问的共享状态通过 std::future 异步返回给调用者。

延期:

如果设置了延迟标志,则 async 转换 f 和 args... 与 std::thread 构造函数相同,但不会产生新的执行线程。相反,执行惰性求值:第一次调用 std::future 上的非定时等待函数,异步返回给调用者将导致 f 的副本与 args 的副本一起被调用(作为右值)。 . (也作为右值传递)在当前线程(它不必是最初调用 std::async 的线程)。结果或异常被置于与未来关联的共享状态中,然后才准备就绪。对同一 std::future 的所有进一步访问将立即返回结果。

有关详细信息,请参阅文档

java multithreading asynchronous future completable-future

6
推荐指数
2
解决办法
154
查看次数

用于 Java CompletableFuture 组合的线程?

我开始熟悉 JavaCompletableFuture组合,使用过 JavaScript 承诺。基本上,组合只是在指定的执行器上安排了链式命令。但是我不确定在执行组合时哪个线程正在运行。

假设我有两个执行者,executor1并且executor2;为简单起见,假设它们是单独的线程池。我安排了一个CompletableFuture(使用非常松散的描述):

CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);
Run Code Online (Sandbox Code Playgroud)

然后,当做到这一点我转换FooBar使用第二执行人:

CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);
Run Code Online (Sandbox Code Playgroud)

我知道getFoo()将从executor1线程池中的线程调用。我知道fooToBar()将从executor2线程池中的线程调用。

但是什么线程用于实际的组合,即在getFoo()完成和futureFoo()完成之后;但之前fooToBar()命令被安排在executor2换句话说,哪个线程实际运行代码以在第二个执行器上调度第二个命令?

调度是否作为executor1调用的同一线程的一部分执行getFoo()?如果是这样,这个可完成的未来组合是否等同于我fooToBar()executor1任务的第一个命令中自己手动安排?

java parallel-processing executor completable-future

6
推荐指数
2
解决办法
319
查看次数

CompletableFuture 是否保证运行新线程?

给定一些任意的上下文(例如 Junit 单元测试,而不是特定的,不一定是“主”线程)。

像这样的代码必须引入至少 2 个线程吗?

public static void main (String[] args)
{
    CompletableFuture<Void> s = new CompletableFuture<>();
    CompletableFuture<Void> f = new CompletableFuture<>();
    
    CompletableFuture<Void> someContext =  CompletableFuture.supplyAsync(() ->
    {
        try{    
          System.out.println(Thread.currentThread().getId());
          CompletableFuture<String> update =
          CompletableFuture.supplyAsync(
            () -> {
              String ans = null;
              try {
                System.out.println(Thread.currentThread().getId());
                ans = "Hello";
              } catch (Exception e) {
                ans = e.toString();
              } finally {
                s.complete(null);
                return ans;
              }
            });
          s.get();
          System.out.println(s.isDone());
        } catch (Exception e) {
            System.out.println("Some error");
            return null;
        }
        return null;
    });
    
    System.out.println(f.isDone()); …
Run Code Online (Sandbox Code Playgroud)

java multithreading thread-safety completable-future

6
推荐指数
1
解决办法
85
查看次数

从 CompletableFuture.allof() 获取单独的结果

我有一个类,它使用 CompletableFutures 向两个依赖服务发出并发请求。

我的代码如下所示:

@Builder
@Slf4j
public class TestClass {

    @NonNull private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
    @NonNull private final dependency1Client;
    @NonNull private final dependency2Client;

    public void myMethod() {

        RequestObject1 firstDependencyRequest = RequestObject1.builder()
                .attribute1("someValue")
                .attribute2("secondValue");

        CompletableFuture<ResultStructure1> future1 = CompletableFuture.supplyAsync(() -> dependency1Client.call(firstDependencyRequest), threadPool);
        RequestObject2 secondDependencyRequest = RequestObject2.builder()
                .attribute1("someValue")
                .attribute2("secondValue");

        CompletableFuture<ResultStructure2> future2 = CompletableFuture.supplyAsync(() -> dependency2Client.call(secondDependencyRequest), threadPool);

        try {
            CompletableFuture finalFuture = CompletableFuture.allOf(future1, future2);

        } catch (ExecutionException|InterruptedException e) {
            log.error("Exception calling dependency", e);
            throw new RuntimeException(e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我需要对依赖服务的两次调用的结果。如何在不执行阻塞调用的情况下获取它们?我最初以为我会执行 future1 .get(),但这是一个阻塞调用,我必须等到获得第一个 …

java concurrency completable-future

5
推荐指数
1
解决办法
4405
查看次数

使用 CompletableFuture 抛出已检查的异常

Stackoverflow 包含多个有关将检查异常与CompletableFuture.

这里有一些例子:

虽然一些答案暗示使用CompletableFuture.completeExceptionally()他们的方法会导致用户代码难以阅读。

我将利用这个空间提供一个替代解决方案,以提高可读性。

请注意,这个问题特定于 CompletableFuture。这使我们能够提供不更普遍地扩展到 lambda 表达式的解决方案。

java completable-future

5
推荐指数
1
解决办法
5028
查看次数

具有 CompletableFuture 的非阻塞异步 Jersey JAX-RS

我正在研究 Jersey,我在一本书中看到您可以使用 CompletableFuture (和 CompletitionStage)以非阻塞 IO 方式调用您的 API。

但是当我用Postman调用API时,我总是得到500。

如果我调试代码,我会发现这些方法被正确调用。

第一个 GET 方法是同步的并且可以正确工作。第二次和第三次返回错误500

我缺少什么?

@Path("/hello")
public class HelloController {

  @GET
  @Path("/first")
  @Produces(MediaType.TEXT_PLAIN)
  public String first() {
    return "It works";
  }

  @GET
  @Path("/second")
  @Produces(MediaType.TEXT_PLAIN)
  public CompletionStage<Response> second() {
    return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
  }

  @GET
  @Path("/third")
  @Produces(MediaType.TEXT_PLAIN)
  public CompletableFuture<Response> third() {
    return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
  }
}
Run Code Online (Sandbox Code Playgroud)

java asynchronous jax-rs jersey completable-future

5
推荐指数
1
解决办法
3061
查看次数

下面CompletableFuture示例中join的调用是否会阻塞进程

我试图理解 CompletableFutures 和返回已完成的 future 的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。

第一个方法应该提供一个包含 userId 列表的完整 future,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中为字符串)。

总结一下:

  1. 获取 id
  2. 获取与这些 id 对应的用户列表。

我创建了简单的方法来模拟睡眠线程的响应。请检查下面的代码

public class PipelineOfTasksExample {

    private Map<Long, String> db = new HashMap<>();

    PipelineOfTasksExample() {
        db.put(1L, "user1");
        db.put(2L, "user2");
        db.put(3L, "user3");
        db.put(4L, "user4");
    }


    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
    }

    private CompletableFuture<String> fetchById(Long id) {
        CompletableFuture<String> …
Run Code Online (Sandbox Code Playgroud)

java nonblocking completable-future

5
推荐指数
1
解决办法
6662
查看次数