Stackoverflow 包含多个有关将检查异常与CompletableFuture.
这里有一些例子:
虽然一些答案暗示使用CompletableFuture.completeExceptionally()他们的方法会导致用户代码难以阅读。
我将利用这个空间提供一个替代解决方案,以提高可读性。
请注意,这个问题特定于 CompletableFuture。这使我们能够提供不更普遍地扩展到 lambda 表达式的解决方案。
我正在研究 Jersey,我在一本书中看到您可以使用 CompletableFuture (和 CompletitionStage)以非阻塞 IO 方式调用您的 API。
但是当我用Postman调用API时,我总是得到500。
如果我调试代码,我会发现这些方法被正确调用。
第一个 GET 方法是同步的并且可以正确工作。第二次和第三次返回错误500。
我缺少什么?
@Path("/hello")
public class HelloController {
@GET
@Path("/first")
@Produces(MediaType.TEXT_PLAIN)
public String first() {
return "It works";
}
@GET
@Path("/second")
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<Response> second() {
return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
}
@GET
@Path("/third")
@Produces(MediaType.TEXT_PLAIN)
public CompletableFuture<Response> third() {
return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
}
}
Run Code Online (Sandbox Code Playgroud) 我正在将 CompletableFuture 用于我的一项服务,如下 -
CompletableFuture<Employee>[] employeeDetails =
empIds.stream().map(empId ->
employeeService.employeeDetails(Integer.valueOf(empId))).toArray(CompletableFuture[]::new);
Run Code Online (Sandbox Code Playgroud)
EmployeeService 内部调用返回员工详细信息的 API。
问题是,当该 API 返回 null 或任何异常时,响应将为 null。当我检查 null 时,即使employeeDetails 数组为 null 并且其值也为 null 并且我得到 Null 指针,它也始终为 false。
我正在检查 null 作为 -
if(employeeDetails != null && employeeDetails.length > 0){
//This condition always true even if null values or null array.
CompletableFuture<Void> allEmployeeDetails = CompletableFuture.allOf(employeeDetails); // Here I am getting NullPointerException
}
Run Code Online (Sandbox Code Playgroud)
我在这里犯了任何错误,或者 CompletableFuture 数组需要处理任何特殊检查。
我试图理解 CompletableFutures 和返回已完成的 future 的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。
第一个方法应该提供一个包含 userId 列表的完整 future,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中为字符串)。
总结一下:
我创建了简单的方法来模拟睡眠线程的响应。请检查下面的代码
public class PipelineOfTasksExample {
private Map<Long, String> db = new HashMap<>();
PipelineOfTasksExample() {
db.put(1L, "user1");
db.put(2L, "user2");
db.put(3L, "user3");
db.put(4L, "user4");
}
private CompletableFuture<List<Long>> returnUserIdsFromDb() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
}
private CompletableFuture<String> fetchById(Long id) {
CompletableFuture<String> …Run Code Online (Sandbox Code Playgroud) 我刚刚开始熟悉 Java 的 CompletableFuture 工具。我创建了一个小玩具应用程序来模拟几乎所有开发人员都会遇到的一些经常性用例。
在这个例子中,我只想将一个东西保存在数据库中,但在这样做之前我想检查该东西是否已经保存。
如果该事物已经在数据库中,则流程(可完成的未来链)应该停止并且不保存该事物。我正在做的是抛出一个异常,这样最终我就可以处理它并向服务的客户端提供一个好的消息,以便他知道发生了什么。
这是我到目前为止所尝试过的:
首先是尝试保存事物的代码,如果事物已在表中,则抛出错误:
repository
.query(thing.getId())
.thenCompose(
mayBeThing -> {
if (mayBeThing.isDefined()) throw new CompletionException(new ThingAlreadyExists());
else return repository.insert(new ThingDTO(thing.getId(), thing.getName()));
Run Code Online (Sandbox Code Playgroud)
这是我正在尝试运行的测试:
CompletableFuture<Integer> eventuallyMayBeThing =
service.save(thing).thenCompose(i -> service.save(thing));
try {
eventuallyMayBeThing.get();
} catch (CompletionException ce) {
System.out.println("Completion exception " + ce.getMessage());
try {
throw ce.getCause();
} catch (ThingAlreadyExist tae) {
assert (true);
} catch (Throwable t) {
throw new AssertionError(t);
}
}
Run Code Online (Sandbox Code Playgroud)
我从这个响应中采用了这种方式:从 CompletableFuture 抛出异常(投票最多的答案的第一部分)。
然而,这是行不通的。确实被抛出ThingAlreadyExist,但它从未被我的 try catch 块处理。我的意思是,这个:
catch (CompletionException …Run Code Online (Sandbox Code Playgroud) 我有以下方法:
@EnableAsync
@Service
Class MyService{
private String processRequest() {
log.info("Start processing request");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Completed processing request");
return RESULT;
}
@Async
public CompletableFuture<String> getSupplyAsyncResult(){
CompletableFuture<String> future
= CompletableFuture.supplyAsync(this::processRequest);
return future;
}
@Async
public CompletableFuture<String> getCompletedFutureResult(){
CompletableFuture<String> future
= CompletableFuture.supplyAsync(this::processRequest);
return future;
}
Run Code Online (Sandbox Code Playgroud)
以及控制器中的以下端点:
@RequestMapping(path = "/asyncSupplyAsync", method = RequestMethod.GET)
public CompletableFuture<String> getValueAsyncUsingCompletableFuture() {
log.info("Request received");
CompletableFuture<String> completableFuture
= myService.getSupplyAsyncResult();
log.info("Servlet thread released");
return completableFuture;
}
Run Code Online (Sandbox Code Playgroud)
和
@RequestMapping(path = "/asyncCompletable", method …Run Code Online (Sandbox Code Playgroud) 何时ints给出:
List<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
使用Java Stream API,我们可以减少它们
MyValue myvalue = ints
.parallelStream()
.map(x -> toMyValue(x))
.reduce((t, t2) -> t.combine(t2))
.get();
Run Code Online (Sandbox Code Playgroud)
在这个例子中,对我来说重要的是......
toMyValue()都会同时加载现在我想通过API做同样的处理CompletableFuture。
为了做地图,我做了:
List<CompeletableFuture<MyValue>> myValueFutures = ints
.stream()
.map(x -> CompletableFuture.supplyAsync(() -> toMyValue(x), MY_THREAD_POOL))
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
现在我不知道如何减少List<CompeletableFuture<MyValue>> myValueFutures单身MyValue。
并行流提供了方便的 API,但由于这些问题我不想使用 Stream API:
有什么办法可以减少 CompetableFutures 吗?没有流减少API的一一?
Java版本:11
我有对象列表,我想对它们执行某些操作,其中一个操作取决于另一个操作的结果。为了以异步方式实现此工作流程,我正在使用CompletableFuture.
目前,我正在通过将列表分区为子列表并将每个子列表赋予 来实现此目的CompletableFuture,因此线程池中的每个线程都可以开始处理该子列表。
我使用和工作的上述方法的代码是:
List<SomeObject> someObjectList // original list
List<List<SomeObject>> partitionList = ListUtils.partition(someObjectList, partionSize);
partitionList.forEach(subList -> {
CompletableFuture.supplyAsync(() -> firstOperation(subList), executorService)
.thenAcceptAsync(firstOpresult -> secondOperationWithFirstOpResult(firstOpresult),executorService);
});
public static List<String> firstOperation(List<SomeObject> subList){
//perform operation
return List<String>;
}
public static void secondOperationWithFirstOpResult(List<String> firstOpProducedList) {
//perform operation
//print results.
}
Run Code Online (Sandbox Code Playgroud)
这里的问题是对原始列表进行分区,
因为如果我的原始列表有 10 万条记录,分区大小为 100(这意味着我希望每个子列表中有 100 个项目),我将有 1000 个子列表对象,每个对象包含 100 个元素,考虑到内存中有这么多对象,这可能不太好,此外,如果分区大小是用户控制/配置控制器,则较小的分区大小将导致子列表对象数量过多。
而不是对原始列表进行分区,
我有以下测试应始终失败:
@Test
public void testCompletable() {
CompletableFuture.completedFuture(0)
.thenAccept(a -> {
org.junit.Assert.assertTrue(1==0);
});
}
Run Code Online (Sandbox Code Playgroud)
而这项测试总是成功的.如何才能使此测试失败?
下面的示例代码我正在注入一个biconsumer睡眠时间为100毫秒,作为一组可完成的未来的完成动作.我whenCompleteAsync通过单独executorService使用来使用方法.executorService是一个ThreadPoolExecutor与芯池大小5,最大尺寸5和1队列的长度.
public class CompleteTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
ArrayList<CompletableFuture<String>> list = new ArrayList<>();
for (int i = 0; i <100; i++) {
CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>();
stringCompletableFuture.whenCompleteAsync((e, a) -> {
System.out.println("Complete " + e);
try {
Thread.sleep(100);
} catch (InterruptedException e1) {e1.printStackTrace();}
}, executorService);
list.add(stringCompletableFuture);
}
for (int i = 0; i < list.size(); i++) { …Run Code Online (Sandbox Code Playgroud) java ×10
java-8 ×4
asynchronous ×2
future ×2
java-stream ×1
jax-rs ×1
jersey ×1
junit ×1
nonblocking ×1
spring-boot ×1