标签: completable-future

Java CompletableFuture + Resteasy

我一直在使用 Java 的 CompletableFuture 这样

CompletableFuture.runAsync(() -> {//Some code here });
Run Code Online (Sandbox Code Playgroud)

当我尝试在此代码块中使用 Resteasy Client 时,我得到一个

javax.ws.rs.ProcessingException: Unable to find a MessageBodyReader of content-type application/json;charset=utf-8 and type class java.lang.String
Run Code Online (Sandbox Code Playgroud)

如果我在 completablefuture 之外使用客户端,则它可以工作。Resteasy 代码看起来像这样

        ResteasyClient client = new ResteasyClientBuilder().build();
        client.register(new AcceptEncodingFilter("gzip"));
        ResteasyWebTarget target = client.target(exampleURL);

        target = target.queryParam("1", 1)
                .queryParam("2", "1")
                .queryParam("3", 3)
                .queryParam("4", 4)
                .queryParam("5", "5");

        Response response = target.request().get();
        resultString = response.readEntity(String.class);
Run Code Online (Sandbox Code Playgroud)

我将在 completablefuture 之外运行 resteasy 代码来“修复”问题,但想了解为什么会发生这种情况。

CompletableFuture 中的 resteasy 代码如下所示:

CompletableFuture.runAsync(() -> {
            try {
                ResteasyClient client = new …
Run Code Online (Sandbox Code Playgroud)

java resteasy java-8 completable-future

5
推荐指数
1
解决办法
978
查看次数

将未来可完成的异常映射到不同的异常类型?

我正在使用 java 8 的可完成的 future,我希望能够接受 future 抛出的异常并将其转换为不同的异常。

一旦发生异常,我尝试过的所有复合材料似乎都会短路。

例如,使用 scala future,我可以做这样的事情:

scala.concurrent.Future<Object> translatedException = ask.recover(new Recover<Object>() {
            @Override public Object recover(final Throwable failure) throws Throwable {
                if (failure instanceof AskTimeoutException) {
                    throw new ApiException(failure);
                }

                throw failure;
            }
        }, actorSystem.dispatcher());
Run Code Online (Sandbox Code Playgroud)

我希望能够在未来的 java 复合块中模仿这一点。这可能吗?

java exception completable-future

5
推荐指数
1
解决办法
3532
查看次数

为什么 CompletableFuture 的 thenAccept() 不在主线程上运行

我在 CompletableFuture 的 SupplyAsync() 中处理长时间运行的操作,并将结果放入 thenAccept() 中。有时 thenAccept() 在主线程上执行,但有时它在工作线程上运行。但我只想在主线程上运行 thenAccept() 操作。这是示例代码。

private void test() {

    ExecutorService executorService = Executors.newSingleThreadExecutor();

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync | I am running on : " + Thread.currentThread().getName());
        return "Hello world";
    }, executorService);

    CompletableFuture<Void> cf3 = cf1.thenAccept(s -> {
        System.out.print("thenAccept | I am running on : " + Thread.currentThread().getName());
        System.out.println(" | answer : " + s);
    });

    cf3.thenRun(() -> {
        System.out.println("thenRun | I am running on : " + Thread.currentThread().getName());
        System.out.println();
    });

} …
Run Code Online (Sandbox Code Playgroud)

java completable-future

5
推荐指数
1
解决办法
5182
查看次数

嵌套期货未执行

我遇到了一个奇怪的情况。我正在摆弄CompletableFuture,当运行以下代码时,我得到了意想不到的结果:

public static void main(String[] args) {     
    CompletableFuture<CompletableFuture<CompletableFuture<CompletableFuture<CompletableFuture<CompletableFuture<Object>>>>>> completableFutureCompletableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("first");
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("second");
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("third");
                return CompletableFuture.supplyAsync(() -> {
                    System.out.println("fourth");
                    return CompletableFuture.supplyAsync(() -> {
                        System.out.println("fifth");
                        return CompletableFuture.completedFuture(null);
                    });
                });
            });
        });
    });

   completableFutureCompletableFuture.get();
}
Run Code Online (Sandbox Code Playgroud)

没有抛出异常(即使使用时exceptionally),我看到的是控制台输出是

first
second
third // appears sometimes
Run Code Online (Sandbox Code Playgroud)

现在,显然这段代码没有真正的生产价值,但这代表了一种情况,即您的代码具有未知数量的嵌套,其中每个或其中一些嵌套创建的嵌套CompleteableFutures将不会被执行。

任何解释(以及如何修复的示例)将不胜感激

java multithreading completable-future

5
推荐指数
2
解决办法
2416
查看次数

从垃圾收集中不直观地驱逐对象

我正在调试内存泄漏,并且不得不深入了解 CompletableFuture 的内部结构。有这段代码(CompletableFuture.uniComposeStage):

CompletableFuture<V> g = f.apply(t).toCompletableFuture();
...
CompletableFuture<V> d = new CompletableFuture<V>();
UniRelay<V> copy = new UniRelay<V>(d, g);
g.push(copy);
copy.tryFire(SYNC);
return d;
Run Code Online (Sandbox Code Playgroud)

代码本身对我来说非常清楚:应用一个返回 CompletionStage ( g) 的函数,创建一个中继,最终将值传输到另一个 CompletableFuture ( d) ,然后返回另一个 future ( d) 。我看到以下参考情况:

  • copy引用dand g(构造函数中没有魔法,只有字段赋值)
  • g参考copy
  • d没有引用任何内容

d返回,因此,事实上,对我来说,gcopy似乎都是内部方法变量,(乍一看)永远不应该离开该方法并最终被GC。幼稚的测试和它是由经过验证的开发人员很久以前编写的事实都表明我错了并且遗漏了一些东西。是什么原因导致这些对象被垃圾收集忽略?

java garbage-collection completable-future

5
推荐指数
1
解决办法
510
查看次数

Java 中的异步 File.copy

Java中有没有一种方法可以以异步方式将一个文件复制到另一个文件中?我试图找到类似于C# 中的Stream.CopyToAsync的东西。

我想要实现的是从互联网下载一系列约 40 个文件,这是我为每个文件想到的最好的结果:

CompletableFuture.allOf(myFiles.stream()
        .map(file -> CompletableFuture.supplyAsync(() -> syncDownloadFile(file)))
        .toArray(CompletableFuture[]::class))
    .then(ignored -> doSomethingAfterAllDownloadsAreComplete());
Run Code Online (Sandbox Code Playgroud)

哪里syncDownloadFile

private void syncDownloadFile(MyFile file) {
    try (InputStream is = file.mySourceUrl.openStream()) {
        long actualSize = Files.copy(is, file.myDestinationNIOPath);
        // size validation here
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
Run Code Online (Sandbox Code Playgroud)

但这意味着我在任务执行器内部有一些阻塞调用,我想避免这种情况,所以我不会一次阻塞太多执行器。

我不确定 C# 方法内部是否执行相同的操作(我的意思是,必须有东西下载该文件,对吗?)。

有更好的方法来实现这一点吗?

java nio completable-future

5
推荐指数
1
解决办法
1698
查看次数

如果没有多线程,使用Future有什么意义呢?

我继承了一些代码,原来的开发人员已经没有人留下了。该代码大量使用了CompletableFuture,这是我第一次使用它,所以我仍在尝试理解它。据我了解, a(Completable)Future通常与某种多线程机制一起使用,该机制允许我们在执行耗时的任务时执行其他操作,然后只需通过 Future 获取其结果。正如javadoc中所示:

interface ArchiveSearcher { String search(String target); }
class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    void showSearch(final String target) throws InterruptedException {
        Future<String> future = executor.submit(new Callable<String>() {
        public String call() {
            return searcher.search(target);
        }});
        displayOtherThings(); // do other things while searching
        try {
            displayText(future.get()); // use future
        } catch (ExecutionException ex) { cleanup(); return; }
    }
}
Run Code Online (Sandbox Code Playgroud)

然而,在我继承的这个应用程序中,以下不使用任何多线程的模式多次出现:

public Object serve(Object input) throws ExecutionException, InterruptedException { …
Run Code Online (Sandbox Code Playgroud)

java multithreading asynchronous future completable-future

5
推荐指数
1
解决办法
1839
查看次数

从 CompletableFuture.allof() 获取单独的结果

我有一个类,它使用 CompletableFutures 向两个依赖服务发出并发请求。

我的代码如下所示:

@Builder
@Slf4j
public class TestClass {

    @NonNull private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
    @NonNull private final dependency1Client;
    @NonNull private final dependency2Client;

    public void myMethod() {

        RequestObject1 firstDependencyRequest = RequestObject1.builder()
                .attribute1("someValue")
                .attribute2("secondValue");

        CompletableFuture<ResultStructure1> future1 = CompletableFuture.supplyAsync(() -> dependency1Client.call(firstDependencyRequest), threadPool);
        RequestObject2 secondDependencyRequest = RequestObject2.builder()
                .attribute1("someValue")
                .attribute2("secondValue");

        CompletableFuture<ResultStructure2> future2 = CompletableFuture.supplyAsync(() -> dependency2Client.call(secondDependencyRequest), threadPool);

        try {
            CompletableFuture finalFuture = CompletableFuture.allOf(future1, future2);

        } catch (ExecutionException|InterruptedException e) {
            log.error("Exception calling dependency", e);
            throw new RuntimeException(e);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我需要对依赖服务的两次调用的结果。如何在不执行阻塞调用的情况下获取它们?我最初以为我会执行 future1 .get(),但这是一个阻塞调用,我必须等到获得第一个 …

java concurrency completable-future

5
推荐指数
1
解决办法
4405
查看次数

使用 CompletableFuture 抛出已检查的异常

Stackoverflow 包含多个有关将检查异常与CompletableFuture.

这里有一些例子:

虽然一些答案暗示使用CompletableFuture.completeExceptionally()他们的方法会导致用户代码难以阅读。

我将利用这个空间提供一个替代解决方案,以提高可读性。

请注意,这个问题特定于 CompletableFuture。这使我们能够提供不更普遍地扩展到 lambda 表达式的解决方案。

java completable-future

5
推荐指数
1
解决办法
5028
查看次数

具有 CompletableFuture 的非阻塞异步 Jersey JAX-RS

我正在研究 Jersey,我在一本书中看到您可以使用 CompletableFuture (和 CompletitionStage)以非阻塞 IO 方式调用您的 API。

但是当我用Postman调用API时,我总是得到500。

如果我调试代码,我会发现这些方法被正确调用。

第一个 GET 方法是同步的并且可以正确工作。第二次和第三次返回错误500

我缺少什么?

@Path("/hello")
public class HelloController {

  @GET
  @Path("/first")
  @Produces(MediaType.TEXT_PLAIN)
  public String first() {
    return "It works";
  }

  @GET
  @Path("/second")
  @Produces(MediaType.TEXT_PLAIN)
  public CompletionStage<Response> second() {
    return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
  }

  @GET
  @Path("/third")
  @Produces(MediaType.TEXT_PLAIN)
  public CompletableFuture<Response> third() {
    return CompletableFuture.supplyAsync(() -> Response.accepted().entity("Hello!").build());
  }
}
Run Code Online (Sandbox Code Playgroud)

java asynchronous jax-rs jersey completable-future

5
推荐指数
1
解决办法
3061
查看次数