我正在通过CompletableFuture收到服务电话的回复.我想处理服务返回的一些已知异常 - 例如乐观并发控制冲突.
这就是我所拥有的.有没有更好的方法来做这个不包装异常或使用SneakyThrows?包装异常意味着其他异常处理程序必须检查因果链而不仅仅是使用instanceof.
someService.call(request)
.handle((response, error) -> {
if (error == null)
return CompletableFuture.completedFuture(response);
else if (error instanceof OCCException)
return CompletableFuture.completedFuture(makeDefaultResponse());
CompletableFuture<Response> errorFuture = new CompletableFuture<>();
errorFuture.completeExceptionally(error);
return errorFuture;
}).thenCompose(Function.identity());
Run Code Online (Sandbox Code Playgroud)
同样的,有没有办法复制番石榴withFallback没有包装解开?
CompletableFuture<T> withFallback(CompletableFuture<T> future,
Function<Throwable, ? extends CompletableFuture<T>> fallback) {
return future.handle((response, error) -> {
if (error == null)
return CompletableFuture.completedFuture(response);
else
return fallback.apply(error);
}).thenCompose(Function.identity());
}
...
// Here's the first part of the question implemented using withFallback.
// It's a little cleaner, but it …Run Code Online (Sandbox Code Playgroud) 鉴于新的Java8,我们为异步任务获得了非常好的功能,例如CompletableFuture和.paralellStream().如果你在Java SE中运行它,因为我已经理解它你将使用ForkJoinPool,但是如果我在例如Wildfly或TomcatEE中运行以下示例会发生什么?
//Here I start a comp.Future without giving an Executor
test = CompletableFuture.supplyAsync(() -> timeConsumingMethod());
//Here I start a parallel stream
mList.paralell().filter(...).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)
会发生什么,我将从哪里借用我的资源
在Async Http Client文档中,我看到如何获取Future<Response>异步HTTP Get请求的结果,例如:
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
Future<Response> f = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/398")
.execute();
Response r = f.get();
Run Code Online (Sandbox Code Playgroud)
但是,为方便起见,我希望得到一个CompletableFuture<T>替代方案,我可以应用一个将结果转换为其他内容的延续,例如将响应内容从Json反序列化为Java对象(例如SoccerSeason.java).这就是我想做的事情:
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
CompletableFuture<Response> f = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/398")
.execute();
f
.thenApply(r -> gson.fromJson(r.getResponseBody(), SoccerSeason.class))
.thenAccept(System.out::println);
Run Code Online (Sandbox Code Playgroud)
根据Async Http Client文档,执行此操作的唯一方法是通过AsyncCompletionHandler<T>对象和使用promise.所以我为此构建了一个辅助方法:
CompletableFuture<Response> getDataAsync(String path){
CompletableFuture<Response> promise = new CompletableFuture<>();
asyncHttpClient
.prepareGet(path)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response response) throws Exception {
promise.complete(response);
return response;
}
@Override …Run Code Online (Sandbox Code Playgroud) 当我CompletableFuture将两个独立的组合与一个BiFunction返回第三个独立的组合时,我试图避免嵌套.目前,使用thenCombine()不削减它:
// What I have
public CompletableFuture<CompletableFuture<C>> doStuff() {
CompletableFuture<A> aFuture = makeSomeA();
CompletableFuture<B> bFuture = makeSomeB();
CompletableFuture<CompletableFuture<C>> cFuture = aFuture.thenCombine(bFuture, this::makeSomeC);
return cFuture;
}
// What I want
public CompletableFuture<C> doStuff() {
CompletableFuture<A> aFuture = makeSomeA();
CompletableFuture<B> bFuture = makeSomeB();
// obv this method does not exist
CompletableFuture<C> c = aFuture.thenBicompose(bFuture, this::makeSomeC);
}
private CompletableFuture<A> makeSomeA() {...}
private CompletableFuture<B> makeSomeB() {...}
private CompletableFuture<C> makeSomeC(A a, B b) {...}
Run Code Online (Sandbox Code Playgroud)
我基本上试图找到一种看起来像haskell的方式,如果有一个CompletableFuturemonad:
doStuff :: …Run Code Online (Sandbox Code Playgroud) 我正在阅读java 8的实际操作,作者引用此链接:http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html
并编写自己的流forker,看起来像这样:
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Main {
public static void main(String... args) {
List<Person> people = Arrays.asList(new Person(23, "Paul"), new Person(24, "Nastya"), new Person(30, "Unknown"));
StreamForker<Person> forker = new StreamForker<>(people.stream())
.fork("All names", s -> s.map(Person::getName).collect(Collectors.joining(", ")))
.fork("Age stats", s -> s.collect(Collectors.summarizingInt(Person::getAge)))
.fork("Oldest", s -> s.reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());
Results results = …Run Code Online (Sandbox Code Playgroud) multithreading blockingqueue java-8 java-stream completable-future
我有这段代码,我想重构Java 8
List<String> menus = new ArrayList<String>();
for (Menu menu : resto1.getMenu()) {
MainIngredient mainIngredient = MainIngredient.getMainIngredient(menu.getName());
if (mainIngredient.getIngredient().indexOf("Vegan")!=-1) {
menus.add(menu.getName());
}
}
Run Code Online (Sandbox Code Playgroud)
重构这个简单的循环之后,似乎代码太多......我是否正确使用CompletableFutures?
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<MainIngredient>> priceFutureList = resto1.getMenu().stream()
.map(menu -> CompletableFuture.supplyAsync(
() -> MainIngredient.getMainIngredient(menu.getName()), executorService))
.collect(Collectors.toList());
CompletableFuture<Void> allFuturesDone = CompletableFuture.allOf(
priceFutureList.toArray(new CompletableFuture[priceFutureList.size()]));
CompletableFuture<List<MainIngredient>> priceListFuture =
allFuturesDone.thenApply(v -> priceFutureList.stream()
.map(CompletableFuture::join)
.collect(toList()));
Run Code Online (Sandbox Code Playgroud) 考虑以下代码 -
public class TestCompletableFuture {
BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
System.out.println(x);
System.out.println(y);
};
public static void main(String args[]) {
TestCompletableFuture testF = new TestCompletableFuture();
testF.start();
}
public void start() {
Supplier<Integer> numberSupplier = new Supplier<Integer>() {
@Override
public Integer get() {
return SupplyNumbers.sendNumbers();
}
};
CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);
}
}
class SupplyNumbers {
public static Integer sendNumbers(){
return 25; // just for working sake its not correct.
}
}
Run Code Online (Sandbox Code Playgroud)
以上的事情很好.但是sendNumbers也可以在我的情况下抛出一个检查过的异常,例如:
class SupplyNumbers {
public …Run Code Online (Sandbox Code Playgroud) 如果我有
CompletableFuture<Something> future1 = service.request(param1);
CompletableFuture<Something> future2 = service.request(param2);
CompletableFuture<Void> many = CompletableFuture.allOf(future1, future2);
Run Code Online (Sandbox Code Playgroud)
我什么时候会发生什么many.cancel()?将future1和future2一并取消?如果没有,最简单的方法是什么?我不愿意留住future1和future2,只是为了能够取消他们时,我想取消many.
关于我为什么要这样做的一些背景:当接收一条数据时,我需要请求匹配的,可能未来的数据来执行计算.如果有更新的数据到达,我想取消先前计算的完成,因为结果将立即被新计算取代.
我必须运行多个外部调用操作,然后以列表的形式获取结果。我决定使用CompletableFutureapi,我准备的代码很恶心:
这个例子:
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.collect(Collectors.toList());
try {
CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
List<User> users = usersResult //the result I need
.stream()
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
} …Run Code Online (Sandbox Code Playgroud) 为什么这段代码在 Java 8 和 Java 11 中表现不同?
private static String test2() {
CompletableFuture
.runAsync(() -> IntStream.rangeClosed(1, 20).forEach(x -> {
try {
Thread.sleep(500);
System.out.println(x);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
return "Finish";
}
Run Code Online (Sandbox Code Playgroud)
我希望它打印完成,然后以 500 毫秒的间隔打印从 1 到 20 的数字,然后停止执行,它在 Java 8 中正常工作。
然而,当我在 Java 11 上运行完全相同的方法时,它打印 Finish 并终止,而没有调用 runAsync(...) 代码。我设法通过添加这样的 ExecutorService 来启动它
private static String test2() {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture
.runAsync(() -> IntStream.rangeClosed(1, 10).forEach(x -> {
try {
Thread.sleep(500);
System.out.println(x);
} catch (InterruptedException e) { …Run Code Online (Sandbox Code Playgroud) java ×8
java-8 ×7
asynchronous ×3
java-stream ×2
exception ×1
haskell ×1
java-11 ×1
java-ee ×1
java-ee-7 ×1