我正在尝试对中的getPrice每个方法进行并行调用。我有这段代码并验证了 getPrice 正在单独的线程中运行,但它们是按顺序运行的,而不是并行运行的。谁能指出我在这里缺少什么吗?productproducts
非常感谢你的帮助。
ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
Set<Product> decoratedProductSet = products.stream()
.map(product -> CompletableFuture
.supplyAsync(() -> getPrice(product.getId(), date, context), service))
.map(t -> t.exceptionally(throwable -> null))
.map(t -> t.join())
.collect(Collectors.<Product>toSet());
Run Code Online (Sandbox Code Playgroud) 给出以下方法:
private static String getChuckNorrisJoke () {
try {
HttpURLConnection con = (HttpURLConnection) new
URL( "http://api.icndb.com/jokes/random" ).openConnection();
BufferedReader in = new BufferedReader( new InputStreamReader(con.getInputStream()));
StringBuilder response = new StringBuilder();
String line;
while ((line = in.readLine()) != null ) {
response.append(line);
}
in.close();
return response.toString();
} catch (IOException e) {
throw new IllegalStateException( "Something is wrong: " , e);
}
}
Run Code Online (Sandbox Code Playgroud)
以下语句可用于以异步方式运行该方法。
final CompletableFuture<String> jokeAsync = CompletableFuture.supplyAsync(() -> getChuckNorrisJoke());
Run Code Online (Sandbox Code Playgroud)
尽管我认为我理解CompletionStage与 的关系CompletableFuture,但我不确定如何使用它CompletionStage来完成相同的任务。
final CompletionStage<String> jokeAsync = …Run Code Online (Sandbox Code Playgroud) 我正在测试如何CompletableFuture工作。我对如何并行执行任务感兴趣:
try {
CompletableFuture one = CompletableFuture.runAsync(() -> {
throw new RuntimeException("error");
});
CompletableFuture two = CompletableFuture.runAsync(() -> System.out.println("2"));
CompletableFuture three = CompletableFuture.runAsync(() -> System.out.println("3"));
CompletableFuture all = CompletableFuture.allOf(one, two, three);
all.get();
} catch (InterruptedException e) {
System.out.println(e);
} catch (ExecutionException e) {
System.out.println(e);
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,他们将全部被处决。
1 . 当其中一个线程出现异常时,是否可以中断所有正在运行的线程?
2 . 当此代码位于可以从不同线程调用的类方法内部时,它是线程安全的吗?
foo.thenCompose(fooResponse -> {
...
return bar.thenCompose(barResponse -> {
...
});
}).exceptionally(e -> {
...
});
Run Code Online (Sandbox Code Playgroud)
这也会.exceptionally()捕获从嵌套bar.thenComposelambda 内部抛出的异常吗?或者我需要这样写:
foo.thenCompose(fooResponse -> {
...
return bar.thenCompose(barResponse -> {
...
}).exceptionally(nestedE -> {
...
});
}).exceptionally(e -> {
...
});
Run Code Online (Sandbox Code Playgroud)
然后又吐了?
我正在尝试加快对多个 API 的调用。
下面的代码中,getFilteredEvents是当前的同步版本。我有一种感觉,该map(x -> x.getFilteredEvents(eventResearch))操作将等待每个 API(它RestTemplate.exchange()在内部使用)的响应,然后再传递到下一个 API 以构建List<Event>我想要返回的 API。解决方案可能是map在单独的线程上启动调用,但我想尝试一下CompletableFutureAPI。
因此,这getFilteredEventsFaster是我努力改善响应时间的结果。
@Service
public class EventsResearchService {
@Autowired
private List<UniformEventsResearchApi> eventsResearchApis;
// this works, but I'm trying to improve it
public EventResearchResponse getFilteredEvents(EventResearch eventResearch) {
List<Event> eventsList = eventsResearchApis
.stream()
.map(x -> x.getFilteredEvents(eventResearch))
.flatMap(List::stream)
.collect(Collectors.toList());
return extractResponse(eventResearch, eventsList);
}
// this doesn't work yet: what is wrong?
public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
List<CompletableFuture<List<Event>>> futureEventsList = eventsResearchApis
.parallelStream() …Run Code Online (Sandbox Code Playgroud) 我正在下面运行一些异步任务,需要等待它们全部完成。我不知道为什么,但它join()并没有强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?
CompletableFutures列表只是一个映射supplyAsync的流
List<Integer> items = Arrays.asList(1, 2, 3);
List<CompletableFuture<Integer>> futures = items
.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
System.out.println("processing");
// do some processing here
return item;
}))
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
我等待期货之类的。
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)
我可以等待,futures.forEach(CompletableFuture::join);但我想知道为什么我的流方法不起作用。
我在“Java 8 in Action”中找不到任何关于为什么CompletableFuture故意忽略mayInterruptIfRunning. 但即便如此,我也没有真正看到 custom 的任何钩子cancel(boolean),这在中断不影响阻塞操作(例如 I/O 流、简单锁等)的情况下会派上用场。到目前为止,任务本身似乎是预期的钩子,在抽象级别上工作在Future这里没有任何好处。
因此,我要问的是为了从这种情况中挤出一些 neet 自定义取消机制而必须引入的最少样板代码集。
我想返回包含所有键到值的映射的映射,这些值是 API 对这些键的响应。我正在使用CompletableFutureandGuava为此。以下是我的尝试。是否有其他标准方法可以使用 Java 8 和线程 API 实现相同的效果?
地图之中id -> apiResponse(id)。
public static List<String> returnAPIResponse(Integer key) {
return Lists.newArrayList(key.toString() + " Test");
}
public static void main(String[] args) {
List<Integer> keys = Lists.newArrayList(1, 2, 3, 4);
List<CompletableFuture<SimpleEntry<Integer, List<String>>>> futures = keys
.stream()
.map(key -> CompletableFuture.supplyAsync(
() -> new AbstractMap.SimpleEntry<>(key, returnAPIResponse(key))))
.collect(Collectors.toList());
System.out.println(
futures.parallelStream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
Run Code Online (Sandbox Code Playgroud) 我正在将 Sleuth 与CompletableFuture.handle. 例子:
log.info("first");
apnsClient.sendNotification(...).handle((response, cause) -> {
log.info("second");
});
Run Code Online (Sandbox Code Playgroud)
我希望second日志具有与first日志相同的跟踪 ID。然而,事实并非如此。因此我想知道该怎么办?谢谢!
PS 我无法控制如何apnsClient.sendNotification管理线程(因为它来自Pushy),因此无法使用LazyTraceExecutor之类的东西。
java spring spring-cloud completable-future spring-cloud-sleuth
在详细讨论问题之前,我想先介绍一下问题的背景。基本上,我的代码看起来像
for(int i = 0; i < n; i++){
startTime = System.currentTimeMillis();
result = doSomething();
endTime = System.currentTimeMillis();
responseTime.add(endTime - startTime);
results.add(result);
}
print(results and responseTime);
Run Code Online (Sandbox Code Playgroud)
现在我想做的是将doSomething()作为 completableFuture 运行并获取上面提到的responseTime。但因为这样做是完全错误的——
for(int i = 0; i < n; i++){
startTime = System.currentTimeMillis();
resultCF = CompletableFuture.supplyasync(() -> doSomething());
endTime = System.currentTimeMillis();
responseTime.add(endTime - startTime);
results.add(resultCF); //result is to store the completableFutures
}
CompletableFuture.allOf(results.toArray(new CompletableFuture[results.size()])).join();
print(i.get() for i in results and responseTimes);
Run Code Online (Sandbox Code Playgroud)
因为它会破坏获取每个doSomething()的执行时间的目的。那么,有什么方法可以获得每个 completableFuture 的响应时间吗?另外,我需要在循环末尾包含 completableFutures( resultCF ) 的结果数组列表。
java ×9
java-8 ×4
asynchronous ×3
spring ×2
exception ×1
future ×1
guava ×1
lambda ×1
spring-boot ×1
spring-cloud ×1
spring-mvc ×1
time ×1