标签: completable-future

在RxJava中取消超时任务

我正在尝试使用RxJava和Java 8的CompletableFuture类,并且不太了解如何处理超时条件.

import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;

// ...

    Observable<String> doSomethingSlowly() {
        CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
            // this call may be very slow - if it takes too long, 
            // we want to time out and cancel it.
            return processor.slowExternalCall();

        });

        return toObservable(task);
    }

    // ...

    doSomethingSlowly()
        .single()
        .timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));
Run Code Online (Sandbox Code Playgroud)

这基本上有效(如果达到超时三秒,则发布"超时").然而,我还想取消我已经包含的未来任务Observable- 是否可能采用以RxJava为中心的方法?

我知道一个选项是自己处理超时,使用task.get(3, TimeUnit.SECONDS),但我想知道是否可以在RxJava中完成所有任务处理.

java future rx-java completable-future

6
推荐指数
1
解决办法
6943
查看次数

C#Task.ContinueWith()vs java?

在C#中,Task类有ContinueWith方法,当任务运行完成状态时,ContinueWith方法将被调用,而在JAVA中,是否有一些方法如ContinueWith

我知道番石榴listenablefuture,但它使用一个新的线程来等待'任务'完成,它是否等于C# ContinueWith

和JAVA 8 具有相同的效果,那么C#的区别是 什么?CompletableFuture whenCompleteContinueWith listenablefuture CompletableFuture

谢谢!

c# java guava completable-future

6
推荐指数
1
解决办法
502
查看次数

通过 CompletableFuture 调用包含的类方法时,WebTarget Bean 实例化在 Spring 中失败

我正在从我的 Spring Boot 应用程序调用不同的其他 Web 服务。为了提高性能而不是顺序调用它们,我想实现CompletableFuture.supplyAsync()异步执行。

我正在调用的服务,其中一些是内部的,一些是外部的。因此,对于内部,我直接调用它们的 api 接口,这些接口存在于 Maven 依赖项中,而对于外部接口,我正在使用javax.ws.rs.client.WebTarget

在实现调用时,CompletableFuture.supplyAsync()内部的客户端类会成功执行,而具有依赖关系的客户端类则会WebTarget因未创建 bean 而@Inject失败。scopedTarget.XXService_nameXX_wt

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.charityserv_wt': Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [javax.ws.rs.client.WebTarget]: Factory method 'target' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.coreContext': Scope 'request' is not active for the current thread
Run Code Online (Sandbox Code Playgroud)

调用代码:

CompletableFuture<SearchResult> future = CompletableFuture.supplyAsync(() -> searchServiceClientImpl.getsearchSearchRequest(encryptedAccountNumber));
Run Code Online (Sandbox Code Playgroud)

服务客户端类:

@Component
public class …
Run Code Online (Sandbox Code Playgroud)

java spring jax-rs spring-boot completable-future

6
推荐指数
0
解决办法
1336
查看次数

具有CompletableFutures的异步非阻塞任务

我需要在Java 8中创建一个异步,非阻塞的任务,我想使用CompletableFutures,但是我不确定它是否满足我的需求。

为了简化这种情况,假设我们有一个API,可以为用户检索一些数据,但同时希望启动一个单独的任务来执行一些操作。我不需要也不想等待此单独的任务完成,我想立即将响应发送给用户。模拟代码中的一个示例:

public Response doSomething(params) {
  Object data = retrieveSomeData(params);

  // I don't want to wait for this to finish, I don't care if it succeeds or not
  doSomethingNoWait(data);

  return new Response(data);
}
Run Code Online (Sandbox Code Playgroud)

我正在看CompletableFutures,像这样:

CompletableFuture.supplyAsync(this::doSomethingNoWait)  
             .thenApply(this::logSomeMessage); 
Run Code Online (Sandbox Code Playgroud)

我想知道那是正确的方法吗?在doSomethingNoWait完成必须做的事情之前,响应会返回给用户吗?

谢谢!

multithreading java-8 completable-future

6
推荐指数
1
解决办法
1435
查看次数

如何收集顺序调用异步 API 的结果?

我有一个异步 API,它本质上是通过分页返回结果

public CompletableFuture<Response> getNext(int startFrom);
Run Code Online (Sandbox Code Playgroud)

每个Response对象都包含一个偏移量列表startFrom和一个标志,该标志指示是否还有更多剩余元素,因此是否getNext()需要发出另一个请求。

我想编写一个方法来遍历所有页面并检索所有偏移量。我可以像这样以同步方式编写它

int startFrom = 0;
List<Integer> offsets = new ArrayList<>();

for (;;) {
    CompletableFuture<Response> future = getNext(startFrom);
    Response response = future.get(); // an exception stops everything
    if (response.getOffsets().isEmpty()) {
        break; // we're done
    }
    offsets.addAll(response.getOffsets());
    if (!response.hasMore()) {
        break; // we're done
    }
    startFrom = getLast(response.getOffsets());
}
Run Code Online (Sandbox Code Playgroud)

换句话说,我们在0处调用getNext()with。startFrom如果抛出异常,我们就会短路整个过程。否则,如果没有偏移,我们就完成。如果有偏移,我们将它们添加到主列表中。如果没有更多的东西需要获取,我们就完成了。否则,我们将重置startFrom为我们获取的最后一个偏移量并重复。

理想情况下,我想在不阻塞CompletableFuture::get()并返回CompletableFuture<List<Integer>>包含所有偏移量的情况下执行此操作。

我怎样才能做到这一点?我如何编写期货来收集结果?


我正在考虑“递归”(实际上不是在执行中,而是在代码中)

private CompletableFuture<List<Integer>> recur(int startFrom, List<Integer> offsets) …
Run Code Online (Sandbox Code Playgroud)

java asynchronous java-8 completable-future

6
推荐指数
1
解决办法
1772
查看次数

使用以前链接的值,然后在Java 8中编写lambda

我的同事首选的Java 8编码风格一直在链接异步调用,例如

CompletionStage<E> someMethod() {
    return doSomething().thenCompose(a -> {
      // ...
      return b;
    }).thenCompose(b -> {
      // ...
      return c;
    }).thenCompose(c -> {
      // ...
      return d;
    }).thenApply(d -> {
      // ...
      return e;
    });
}
Run Code Online (Sandbox Code Playgroud)

我有类似上面的内容,但有一个额外的挑战:我需要回忆一些lambda中检索到的值,在后来的lambda中.例如,

CompletionStage<E> someMethod() {
    return doSomething().thenCompose(a -> {
      // ...
      Foo foo = fooDAO.getFoos(a);
      // ...
      return b;
    }).thenCompose(b -> {
      // ...
      return c;
    }).thenCompose(c -> {
      // ...
      Bar bar = barDAO.getBars(foo);
      // ...
      return d;
    }).thenApply(d -> {
      // ... …
Run Code Online (Sandbox Code Playgroud)

java chaining java-8 completable-future

6
推荐指数
1
解决办法
319
查看次数

CompletableFuture allOf 方法行为

我有以下一段java代码:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Result of Future 1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Result of Future 2";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Result of Future 3";
        });

        boolean isdone = CompletableFuture.allOf(future1, future2, future3).isDone();

        if …
Run Code Online (Sandbox Code Playgroud)

java completable-future

6
推荐指数
1
解决办法
7891
查看次数

Junit:如何覆盖 CompletableFuture 代码

我是 Junit 新手,最近遇到了这个问题。无论我在代码中使用 CompletableFuture,我都无法编写测试用例。就像下面的Java文件一样

更新

审计服务.java

@Autowired
Executor existingThreadPool;

@Override
public void auditData(List<ErrorDetails> alertList) {
    CompletableFuture.runAsync(() -> {
        if (alertList.isEmpty())
            //privateMethodCall1
        else
           //privateMethodCall2
    }, existingThreadPool);
}
Run Code Online (Sandbox Code Playgroud)

我点击此链接并尝试了下面的解决方案,但仍然出现 NPE for CompletableFuture 类似下面的错误。

审计服务测试.java

@InjectMock
AuditService auditService;

@Mock
private CompletableFuture<Void> completableFuture = null;

@Before
public void setup() {
    MockitoAnnotations.initMocks(this);
    completableFuture = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {}
    },Executors.newSingleThreadExecutor());
}

@Test
public void shouldAuditData() {
    List<ErrorDetails> alertList = new ArrayList();
    auditService.auditData(alertList);
}
Run Code Online (Sandbox Code Playgroud)

错误

java.lang.NullPointerException
    at java.util.concurrent.CompletableFuture.screenExecutor(CompletableFuture.java:415)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
    at …
Run Code Online (Sandbox Code Playgroud)

java junit mockito spring-boot completable-future

6
推荐指数
1
解决办法
2万
查看次数

Future Cancel是否也会在数据库端终止Oracle查询

我正在使用Java Future类执行连接Oracle数据库的程序。但是有时查询的速度比预期的要慢。我可以通过future.cancel方法取消未来。

假设我通过future.cancel取消了Future线程。查询将在Oracle中停止执行,还是仅应用程序线程将被停止/取消,Oracle查询将继续在数据库端运行。

我们正在提交很多future,但有些提交速度很慢。

在这种情况下的实际行为是什么。

谢谢

java completable-future

6
推荐指数
1
解决办法
67
查看次数

从 CompletableFuture 抛出异常会导致 Java 17 中的 get() 和 join() 挂起

当我从 Java 11 切换到 Java 17(从 Ubuntu 20.04 存储库安装 OpenJDK)后,以下代码不起作用:

import java.util.Objects;
import java.util.concurrent.CompletableFuture;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.exception.ExceptionUtils;

public class TestClass {

    public static void main(String[] args) {
        CompletableFuture<String> streamFuture = CompletableFuture.supplyAsync(() -> {
            throw MyException.wrapIfNeeded(new Exception("MyException"));
        });
        String result = null;
        try {
            result = streamFuture.get();
        } catch (Exception e) {
            System.out.println("Exception: " + ExceptionUtils.getMessage(e));
        }
        System.out.println("Result: " + Objects.toString(result));
    }

    static class MyException extends RuntimeException {

        private static final long serialVersionUID = 3349188601484197015L;

        public MyException(Throwable cause) {
            super(cause …
Run Code Online (Sandbox Code Playgroud)

java asynchronous freeze completable-future java-17

6
推荐指数
2
解决办法
1618
查看次数