标签: completable-future

CompletableFuture withFallback /只处理一些错误

我正在通过CompletableFuture收到服务电话的回复.我想处理服务返回的一些已知异常 - 例如乐观并发控制冲突.

这就是我所拥有的.有没有更好的方法来做这个不包装异常或使用SneakyThrows?包装异常意味着其他异常处理程序必须检查因果链而不仅仅是使用instanceof.

someService.call(request)
    .handle((response, error) -> {
        if (error == null)
            return CompletableFuture.completedFuture(response);
        else if (error instanceof OCCException)
            return CompletableFuture.completedFuture(makeDefaultResponse());

        CompletableFuture<Response> errorFuture = new CompletableFuture<>();
        errorFuture.completeExceptionally(error);
        return errorFuture;
    }).thenCompose(Function.identity());
Run Code Online (Sandbox Code Playgroud)

同样的,有没有办法复制番石榴withFallback没有包装解开?

CompletableFuture<T> withFallback(CompletableFuture<T> future,
                                  Function<Throwable, ? extends CompletableFuture<T>> fallback) {
    return future.handle((response, error) -> {
        if (error == null)
            return CompletableFuture.completedFuture(response);
        else
            return fallback.apply(error);
    }).thenCompose(Function.identity());
}


...
// Here's the first part of the question implemented using withFallback.
// It's a little cleaner, but it …
Run Code Online (Sandbox Code Playgroud)

java java-8 completable-future

8
推荐指数
2
解决办法
6420
查看次数

JavaEE应用服务器中的CompletableFuture/parallelStream

鉴于新的Java8,我们为异步任务获得了非常好的功能,例如CompletableFuture和.paralellStream().如果你在Java SE中运行它,因为我已经理解它你将使用ForkJoinPool,但是如果我在例如Wildfly或TomcatEE中运行以下示例会发生什么?

//Here I start a comp.Future without giving an Executor
test = CompletableFuture.supplyAsync(() -> timeConsumingMethod());
//Here I start a parallel stream 
mList.paralell().filter(...).collect(Collectors.toList())
Run Code Online (Sandbox Code Playgroud)

会发生什么,我将从哪里借用我的资源

  1. 这些示例在@Stateful bean中运行
  2. 这些示例在@Stateless bean中运行
  3. 这些示例在CDI bean中运行

java asynchronous java-ee java-ee-7 completable-future

8
推荐指数
1
解决办法
1550
查看次数

如何从Async Http Client请求获取CompletableFuture <T>?

Async Http Client文档中,我看到如何获取Future<Response>异步HTTP Get请求的结果,例如:

AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
Future<Response> f = asyncHttpClient
      .prepareGet("http://api.football-data.org/v1/soccerseasons/398")
      .execute();
Response r = f.get();
Run Code Online (Sandbox Code Playgroud)

但是,为方便起见,我希望得到一个CompletableFuture<T>替代方案,我可以应用一个将结果转换为其他内容的延续,例如将响应内容从Json反序列化为Java对象(例如SoccerSeason.java).这就是我想做的事情:

AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient();
CompletableFuture<Response> f = asyncHttpClient
     .prepareGet("http://api.football-data.org/v1/soccerseasons/398")
     .execute();
f
     .thenApply(r -> gson.fromJson(r.getResponseBody(), SoccerSeason.class))
     .thenAccept(System.out::println);
Run Code Online (Sandbox Code Playgroud)

根据Async Http Client文档,执行此操作的唯一方法是通过AsyncCompletionHandler<T>对象和使用promise.所以我为此构建了一个辅助方法:

CompletableFuture<Response> getDataAsync(String path){
    CompletableFuture<Response> promise = new CompletableFuture<>();
    asyncHttpClient
            .prepareGet(path)
            .execute(new AsyncCompletionHandler<Response>() {
                @Override
                public Response onCompleted(Response response) throws Exception {
                    promise.complete(response);
                    return response;
                }
                @Override …
Run Code Online (Sandbox Code Playgroud)

asynchronous java-8 asynchttpclient completable-future

8
推荐指数
1
解决办法
6129
查看次数

具有CompletableFuture的"双重"组合

当我CompletableFuture将两个独立的组合与一个BiFunction返回第三个独立的组合时,我试图避免嵌套.目前,使用thenCombine()不削减它:

// What I have
public CompletableFuture<CompletableFuture<C>> doStuff() {
    CompletableFuture<A> aFuture = makeSomeA();
    CompletableFuture<B> bFuture = makeSomeB();
    CompletableFuture<CompletableFuture<C>> cFuture = aFuture.thenCombine(bFuture, this::makeSomeC);
    return cFuture;
}

// What I want
public CompletableFuture<C> doStuff() {
    CompletableFuture<A> aFuture = makeSomeA();
    CompletableFuture<B> bFuture = makeSomeB();
    // obv this method does not exist
    CompletableFuture<C> c = aFuture.thenBicompose(bFuture, this::makeSomeC);
} 

private CompletableFuture<A> makeSomeA() {...}
private CompletableFuture<B> makeSomeB() {...}
private CompletableFuture<C> makeSomeC(A a, B b) {...}
Run Code Online (Sandbox Code Playgroud)

我基本上试图找到一种看起来像haskell的方式,如果有一个CompletableFuturemonad:

doStuff :: …
Run Code Online (Sandbox Code Playgroud)

java haskell java-8 completable-future

8
推荐指数
2
解决办法
1044
查看次数

使用多个流来分配流的优势是什么?

我正在阅读java 8的实际操作,作者引用此链接:http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html

并编写自己的流forker,看起来像这样:

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class Main {

    public static void main(String... args) {
        List<Person> people = Arrays.asList(new Person(23, "Paul"), new Person(24, "Nastya"), new Person(30, "Unknown"));
        StreamForker<Person> forker = new StreamForker<>(people.stream())
                .fork("All names", s -> s.map(Person::getName).collect(Collectors.joining(", ")))
                .fork("Age stats", s -> s.collect(Collectors.summarizingInt(Person::getAge)))
                .fork("Oldest", s -> s.reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());
        Results results = …
Run Code Online (Sandbox Code Playgroud)

multithreading blockingqueue java-8 java-stream completable-future

8
推荐指数
1
解决办法
182
查看次数

Java8中的CompletableFuture

我有这段代码,我想重构Java 8

List<String> menus = new ArrayList<String>();           
for (Menu menu : resto1.getMenu()) {            
    MainIngredient mainIngredient = MainIngredient.getMainIngredient(menu.getName());           
    if (mainIngredient.getIngredient().indexOf("Vegan")!=-1) {
        menus.add(menu.getName());
    }                   
}
Run Code Online (Sandbox Code Playgroud)

重构这个简单的循环之后,似乎代码太多......我是否正确使用CompletableFutures?

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<MainIngredient>> priceFutureList = resto1.getMenu().stream()
    .map(menu -> CompletableFuture.supplyAsync(
        () -> MainIngredient.getMainIngredient(menu.getName()), executorService))
    .collect(Collectors.toList());        

CompletableFuture<Void> allFuturesDone = CompletableFuture.allOf(
    priceFutureList.toArray(new CompletableFuture[priceFutureList.size()]));

CompletableFuture<List<MainIngredient>> priceListFuture =        
    allFuturesDone.thenApply(v -> priceFutureList.stream()
        .map(CompletableFuture::join)
        .collect(toList()));
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream completable-future

8
推荐指数
1
解决办法
122
查看次数

使用CompletableFuture处理Java 8供应商异常

考虑以下代码 -

public class TestCompletableFuture {

    BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        testF.start();      
    }

    public void start() {
        Supplier<Integer> numberSupplier = new Supplier<Integer>() {
            @Override
            public Integer get() {
                return SupplyNumbers.sendNumbers();                     
            }
        };
        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);         
    }       
}

class SupplyNumbers {
    public static Integer sendNumbers(){
        return 25; // just for working sake its not  correct.
    }
}
Run Code Online (Sandbox Code Playgroud)

以上的事情很好.但是sendNumbers也可以在我的情况下抛出一个检查过的异常,例如:

class SupplyNumbers {
    public …
Run Code Online (Sandbox Code Playgroud)

java exception java-8 completable-future

7
推荐指数
3
解决办法
1万
查看次数

递归取消allOf CompletableFuture

如果我有

CompletableFuture<Something> future1 = service.request(param1);
CompletableFuture<Something> future2 = service.request(param2);
CompletableFuture<Void> many = CompletableFuture.allOf(future1, future2);
Run Code Online (Sandbox Code Playgroud)

我什么时候会发生什么many.cancel()?将future1future2一并取消?如果没有,最简单的方法是什么?我不愿意留住future1future2,只是为了能够取消他们时,我想取消many.

关于我为什么要这样做的一些背景:当接收一条数据时,我需要请求匹配的,可能未来的数据来执行计算.如果有更新的数据到达,我想取消先前计算的完成,因为结果将立即被新计算取代.

java java-8 completable-future

7
推荐指数
1
解决办法
1034
查看次数

Java从多次调用中收集CompletableFuture的结果

我必须运行多个外部调用操作,然后以列表的形式获取结果。我决定使用CompletableFutureapi,我准备的代码很恶心:

这个例子:

public class Main {
    public static void main(String[] args) {
        String prefix = "collection_";

        List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
                .boxed()
                .map(num -> prefix.concat("" + num))
                .map(name -> CompletableFuture.supplyAsync(
                        () -> callApi(name)))
                .collect(Collectors.toList());

        try {
            CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        List<User> users = usersResult //the result I need
                .stream()
                .map(userCompletableFuture -> {
                    try {
                        return userCompletableFuture.get();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                    return null;
                })
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    } …
Run Code Online (Sandbox Code Playgroud)

java multithreading completable-future

7
推荐指数
1
解决办法
8578
查看次数

为什么我的 CompletableFuture 代码在 Java 8 中运行而不在 Java 11 中运行?

为什么这段代码在 Java 8 和 Java 11 中表现不同?

private static String test2() {
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 20).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));

    return "Finish";
}
Run Code Online (Sandbox Code Playgroud)

我希望它打印完成,然后以 500 毫秒的间隔打印从 1 到 20 的数字,然后停止执行,它在 Java 8 中正常工作。

然而,当我在 Java 11 上运行完全相同的方法时,它打印 Finish 并终止,而没有调用 runAsync(...) 代码。我设法通过添加这样的 ExecutorService 来启动它

private static String test2() {

    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 10).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) { …
Run Code Online (Sandbox Code Playgroud)

java multithreading asynchronous completable-future java-11

7
推荐指数
1
解决办法
373
查看次数