我有两个问题:1.Callable在Java 8中运行作为任务的最简单的规范形式是什么,捕获和处理结果?2.在下面的示例中,在所有任务完成之前,保持主进程打开的最佳/最简单/最清晰的方法是什么?
这是我到目前为止的例子 - 这是Java 8中最好的方法还是有更基本的东西?
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
public class SimpleTask implements Supplier<String> {
private SplittableRandom rand = new SplittableRandom();
final int id;
SimpleTask(int id) { this.id = id; }
@Override
public String get() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(50, 300));
} catch(InterruptedException e) {
System.err.println("Interrupted");
}
return "Completed " + id + " on " +
Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
for(int i = 0; i < 10; i++)
CompletableFuture.supplyAsync(new …Run Code Online (Sandbox Code Playgroud) 在下面的代码中,调用thenRunAsync会有什么不同吗?我应该打电话给那么跑吗?
CompletableFuture.runAsync(this::doWork , executorService)
.thenRunAsync(this::handleSuccess);
Run Code Online (Sandbox Code Playgroud)
根据评论进行阐述:如果我使用此代码,
CompletableFuture.runAsync(this::doWork , executorService)
.thenRun(this::handleSuccess);
Run Code Online (Sandbox Code Playgroud)
会有什么不同吗?
在这两种情况下,行为都是非阻塞的,并且无论如何,第二个任务在第一个任务完成之前不会运行,据我所知.
假设我有一个线程调用返回completablefuture的一堆方法,并说我将所有这些添加到列表中,最后我做了completablefutures.allof(list_size).join()).现在列表中的期货可以扩展到多个核心吗?换句话说是将期货安排到多个核心以利用并行性?
这是我的情景:
我的应用程序启用了Mongo审核,使用自定义AuditorAware从当前用户获取SecurityContext.这适用于同步方法,并且当前审计员已成功保存,但我无法使其与@Async方法一起正常工作.
我有一个异步方法(CompletableFuture),可以对我的Mongo数据库进行一些更新.当AuditorAware.getCurrentAuditor()被调用时,没有任何身份验证信息存在,我不能让现任核数师(SecurityContextHolder.getContext().getAuthentication()回报null).
@Override
public User getCurrentAuditor() {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication == null || !authentication.isAuthenticated()
|| authentication instanceof AnonymousAuthenticationToken) {
log.error("Not authenticated");
return null;
}
[...]
}
Run Code Online (Sandbox Code Playgroud)
我用的是DelegatingSecurityContextAsyncTaskExecutor:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(200);
executor.initialize();
return new DelegatingSecurityContextAsyncTaskExecutor(executor);
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new ItacaExceptionHandler();
} …Run Code Online (Sandbox Code Playgroud) 我试图从我CompletableFuture这样的列表中返回一个列表:
public List<Provider> get() {
CompletableFuture<List<Provider>> providersResponse = getSomeData();
return providersResponse.thenAccept((List<Provider> providers) -> {
return providers;
});
}
Run Code Online (Sandbox Code Playgroud)
它失败了"意外的返回类型.但是如何以异步方式返回结果?
我有以下要求.
这是我目前的代码:
public CompletableFuture<ObjectId> createDeliveryNoteDocument(String productId, List<String> releaseNotesIds) {
CompletableFuture<ObjectId> deliveryNoteFuture =
CompletableFuture
.supplyAsync(() -> sequenceServiceFeignClient.getNextValueForSequenceNameNoResponseEntity(DocumentType.DELIVERYNOTE.toString()))
.whenComplete((result, error) -> {
if (error != null)
logger.error("Unable to get next sequence number for DELIVERYNOTE sequence", error);
})
.thenCompose(seqNumber -> {
Set<ObjectAttribute> objectAttributes = new HashSet<>();
objectAttributes.add(new ObjectAttribute(Constants.Document.DOCUMENT_TYPE, DocumentType.DELIVERYNOTE.toString()));
objectAttributes.add(new ObjectAttribute(Constants.Document.DOCUMENT_NO, seqNumber));
objectAttributes.add(new ObjectAttribute(Constants.Document.PRODUCT_ID, productId));
return objectCommandService.createCustomObject(new ObjectTypeTableName(Constants.ObjectTables.DOCUMENT), objectAttributes);
});
CompletableFuture<Void> releaseNotesFuture =
deliveryNoteFuture
.thenComposeAsync(deliveryNoteId -> joinReleaseNotesWithDeliveryNote(deliveryNoteId, releaseNotesIds));
CompletableFuture<Void> parcelsFuture =
deliveryNoteFuture
.thenComposeAsync(deliveryNoteId -> changeParcelsStatusForReleaseNotes(releaseNotesIds));
return deliveryNoteFuture;
}
Run Code Online (Sandbox Code Playgroud)
我怎么能等待 …
我想根据特定条件跳过可完成的功能链。我尝试了在多个完成阶段进行链接时提出的解决方案,但似乎没有用。这里的代码:
@RunWith(JUnit4.class)
public class ExceptionHandlingTests {
@Test
public void test1() {
CompletableFuture<Integer> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
System.out.println("Completing result1. Result: " + result.isDone());
result.complete(10);
}).thenCompose(x -> {
System.out.println("Completing result2. Result: " + result.isDone());
result.complete(10);
return CompletableFuture.completedFuture(5);
}).thenCompose(x -> {
System.out.println("Completing result3. Result: " + result.isDone());
result.complete(10);
return CompletableFuture.completedFuture(5);
}).applyToEither(result, Function.identity());
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
Completing result1. Result: false
Completing result2. Result: true
Completing result3. Result: true
Run Code Online (Sandbox Code Playgroud)
即使“结果”可完成期货被标记为已完成,后续的可完成期货仍将执行。如何跳过Completablefuture 2和3?
当已经运行的任务之一抛出异常时,我需要取消所有已调度但尚未运行的CompletableFuture任务.
尝试以下示例,但大多数情况下main方法不会退出(可能是由于某种类型的死锁).
public static void main(String[] args) {
ExecutorService executionService = Executors.newFixedThreadPool(5);
Set< CompletableFuture<?> > tasks = new HashSet<>();
for (int i = 0; i < 1000; i++) {
final int id = i;
CompletableFuture<?> c = CompletableFuture
.runAsync( () -> {
System.out.println("Running: " + id);
if ( id == 400 ) throw new RuntimeException("Exception from: " + id);
}, executionService )
.whenComplete( (v, ex) -> {
if ( ex != null ) {
System.out.println("Shutting down.");
executionService.shutdownNow();
System.out.println("shutdown.");
}
} …Run Code Online (Sandbox Code Playgroud) I want to wrap a Runnable in CompletableFuture to be computed asynchronously, but with control over when does the computation begin and end. I've created a CompletableFuture with CountDownLatch to block the processing, but the following snippet throws an error:
CountDownLatch countDownLatch = new CountDownLatch(1);
CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Stop");
});
Thread.sleep(1000L);
System.out.println("Start");
completableFuture.get(1000L, TimeUnit.MILLISECONDS);
countDownLatch.countDown();
Run Code Online (Sandbox Code Playgroud)
Start Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at Sandbox.main(Sandbox.java:23) …
使用async / await可以以命令式方式编写异步函数。这可以极大地促进异步编程。自从C#首次引入以来,它就被许多语言所采用,例如JavaScript,Python和Kotlin。
EA Async是一个向Java添加async / await之类功能的库。该库消除了使用CompletableFutures的复杂性。
但是,为什么既未将async / await添加到Java SE中,又未计划在将来添加它?
java ×9
java-8 ×8
concurrency ×3
async-await ×1
asynchronous ×1
java-threads ×1
mongodb ×1
spring ×1