我写了一个人为的代码示例,它可能不是某人应该使用的代码,但我相信它应该有效.然而它反而陷入僵局.我已经阅读了这里描述的答案,但发现它们不足.
这是代码示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class Test {
public static void main(String argv[]) throws Exception {
int nThreads = 1;
Executor executor = Executors.newFixedThreadPool( nThreads );
CompletableFuture.completedFuture(true)
.thenComposeAsync((unused)->{
System.err.println("About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
// pretend this is some really expensive computation done asynchronously
System.err.println("Inner task");
innerFuture.complete(true);
});
System.err.println("Task enqueued");
return innerFuture;
}, executor).get();
System.err.println("All done");
System.exit(0);
}
}
Run Code Online (Sandbox Code Playgroud)
这打印:
即将入队任务
任务排队
然后它挂了.它已陷入僵局,因为执行程序只有一个线程,并且它正在等待innerFuture变为可兑换.为什么它的返回值"thenComposeAsync"块成为可兑换的,而不是返回仍然不完整的未来,并在执行释放它的线程?
这感觉完全不直观,javadocs并没有真正帮助.我是否从根本上误解了CompletionStages的工作原理?或者这是实施中的错误?
我通过流式传输对象列表来调用异步客户端方法.该方法返回Future.
迭代调用后返回的Futures列表的最佳方法是什么(以便处理那些首先出现的Future)?
注意:异步客户端仅返回Future not CompletableFuture.
以下是代码:
List<Future<Object>> listOfFuture = objectsToProcess.parallelStream()
.map((object) -> {
/* calling an async client returning a Future<Object> */ })
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud) 在下面的代码中,当端点getPerson被命中时,响应将是一个 Person 类型的 JSON。Spring 如何转换CompletableFuture<Person>为Person?
@RestController
public class PersonController {
@Autowired
private PersonService personService;
@GetMapping("/persons/{personId}" )
public CompletableFuture<Person> getPerson(@PathVariable("personId") Integer personId) {
return CompletableFuture.supplyAsync(() -> personService.getPerson(personId));
}
}
Run Code Online (Sandbox Code Playgroud) 在 C++ 中,您可以使用延迟或异步启动策略启动线程。有没有办法在 Java 中复制此功能?
auto T1 = std::async(std::launch::deferred, doSomething());
auto T2 = std::async(std::launch::async, doSomething());
Run Code Online (Sandbox Code Playgroud)
每个的描述——
异步:
如果设置了 async 标志,则 async 在新的执行线程上执行可调用对象 f(所有线程局部初始化),除非函数 f 返回值或抛出异常,否则它会存储在可访问的共享状态通过 std::future 异步返回给调用者。
延期:
如果设置了延迟标志,则 async 转换 f 和 args... 与 std::thread 构造函数相同,但不会产生新的执行线程。相反,执行惰性求值:第一次调用 std::future 上的非定时等待函数,异步返回给调用者将导致 f 的副本与 args 的副本一起被调用(作为右值)。 . (也作为右值传递)在当前线程(它不必是最初调用 std::async 的线程)。结果或异常被置于与未来关联的共享状态中,然后才准备就绪。对同一 std::future 的所有进一步访问将立即返回结果。
有关详细信息,请参阅文档。
我开始熟悉 JavaCompletableFuture组合,使用过 JavaScript 承诺。基本上,组合只是在指定的执行器上安排了链式命令。但是我不确定在执行组合时哪个线程正在运行。
假设我有两个执行者,executor1并且executor2;为简单起见,假设它们是单独的线程池。我安排了一个CompletableFuture(使用非常松散的描述):
CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);
Run Code Online (Sandbox Code Playgroud)
然后,当做到这一点我转换Foo到Bar使用第二执行人:
CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);
Run Code Online (Sandbox Code Playgroud)
我知道getFoo()将从executor1线程池中的线程调用。我知道fooToBar()将从executor2线程池中的线程调用。
但是什么线程用于实际的组合,即在getFoo()完成和futureFoo()完成之后;但之前的fooToBar()命令被安排在executor2?换句话说,哪个线程实际运行代码以在第二个执行器上调度第二个命令?
调度是否作为executor1调用的同一线程的一部分执行getFoo()?如果是这样,这个可完成的未来组合是否等同于我fooToBar()在executor1任务的第一个命令中自己手动安排?
给定一些任意的上下文(例如 Junit 单元测试,而不是特定的,不一定是“主”线程)。
像这样的代码必须引入至少 2 个线程吗?
public static void main (String[] args)
{
CompletableFuture<Void> s = new CompletableFuture<>();
CompletableFuture<Void> f = new CompletableFuture<>();
CompletableFuture<Void> someContext = CompletableFuture.supplyAsync(() ->
{
try{
System.out.println(Thread.currentThread().getId());
CompletableFuture<String> update =
CompletableFuture.supplyAsync(
() -> {
String ans = null;
try {
System.out.println(Thread.currentThread().getId());
ans = "Hello";
} catch (Exception e) {
ans = e.toString();
} finally {
s.complete(null);
return ans;
}
});
s.get();
System.out.println(s.isDone());
} catch (Exception e) {
System.out.println("Some error");
return null;
}
return null;
});
System.out.println(f.isDone()); …Run Code Online (Sandbox Code Playgroud) 我有一个类,它使用 CompletableFutures 向两个依赖服务发出并发请求。
我的代码如下所示:
@Builder
@Slf4j
public class TestClass {
@NonNull private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
@NonNull private final dependency1Client;
@NonNull private final dependency2Client;
public void myMethod() {
RequestObject1 firstDependencyRequest = RequestObject1.builder()
.attribute1("someValue")
.attribute2("secondValue");
CompletableFuture<ResultStructure1> future1 = CompletableFuture.supplyAsync(() -> dependency1Client.call(firstDependencyRequest), threadPool);
RequestObject2 secondDependencyRequest = RequestObject2.builder()
.attribute1("someValue")
.attribute2("secondValue");
CompletableFuture<ResultStructure2> future2 = CompletableFuture.supplyAsync(() -> dependency2Client.call(secondDependencyRequest), threadPool);
try {
CompletableFuture finalFuture = CompletableFuture.allOf(future1, future2);
} catch (ExecutionException|InterruptedException e) {
log.error("Exception calling dependency", e);
throw new RuntimeException(e);
}
}
}
Run Code Online (Sandbox Code Playgroud)
我需要对依赖服务的两次调用的结果。如何在不执行阻塞调用的情况下获取它们?我最初以为我会执行 future1 .get(),但这是一个阻塞调用,我必须等到获得第一个 …
Stackoverflow 包含多个有关将检查异常与CompletableFuture.
这里有一些例子:
虽然一些答案暗示使用CompletableFuture.completeExceptionally()他们的方法会导致用户代码难以阅读。
我将利用这个空间提供一个替代解决方案,以提高可读性。
请注意,这个问题特定于 CompletableFuture。这使我们能够提供不更普遍地扩展到 lambda 表达式的解决方案。
我正在研究 Jersey,我在一本书中看到您可以使用 CompletableFuture (和 CompletitionStage)以非阻塞 IO 方式调用您的 API。
但是当我用Postman调用API时,我总是得到500。
如果我调试代码,我会发现这些方法被正确调用。
第一个 GET 方法是同步的并且可以正确工作。第二次和第三次返回错误500。
我缺少什么?
@Path("/hello")
public class HelloController {
@GET
@Path("/first")
@Produces(MediaType.TEXT_PLAIN)
public String first() {
return "It works";
}
@GET
@Path("/second")
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<Response> second() {
return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
}
@GET
@Path("/third")
@Produces(MediaType.TEXT_PLAIN)
public CompletableFuture<Response> third() {
return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
}
}
Run Code Online (Sandbox Code Playgroud) 我试图理解 CompletableFutures 和返回已完成的 future 的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。
第一个方法应该提供一个包含 userId 列表的完整 future,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中为字符串)。
总结一下:
我创建了简单的方法来模拟睡眠线程的响应。请检查下面的代码
public class PipelineOfTasksExample {
private Map<Long, String> db = new HashMap<>();
PipelineOfTasksExample() {
db.put(1L, "user1");
db.put(2L, "user2");
db.put(3L, "user3");
db.put(4L, "user4");
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
}
private CompletableFuture<String> fetchById(Long id) {
CompletableFuture<String> …Run Code Online (Sandbox Code Playgroud) java ×10
java-8 ×3
asynchronous ×2
concurrency ×2
future ×2
executor ×1
jax-rs ×1
jersey ×1
nonblocking ×1
spring ×1
spring-mvc ×1