我正在尝试使用RxJava和Java 8的CompletableFuture类,并且不太了解如何处理超时条件.
import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;
// ...
Observable<String> doSomethingSlowly() {
CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
// this call may be very slow - if it takes too long,
// we want to time out and cancel it.
return processor.slowExternalCall();
});
return toObservable(task);
}
// ...
doSomethingSlowly()
.single()
.timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));
Run Code Online (Sandbox Code Playgroud)
这基本上有效(如果达到超时三秒,则发布"超时").然而,我还想取消我已经包含的未来任务Observable- 是否可能采用以RxJava为中心的方法?
我知道一个选项是自己处理超时,使用task.get(3, TimeUnit.SECONDS),但我想知道是否可以在RxJava中完成所有任务处理.
在C#中,Task类有ContinueWith方法,当任务运行完成状态时,ContinueWith方法将被调用,而在JAVA中,是否有一些方法如ContinueWith?
我知道番石榴listenablefuture,但它使用一个新的线程来等待'任务'完成,它是否等于C# ContinueWith?
和JAVA 8 具有相同的效果,那么C#的区别是 什么?CompletableFuture whenCompleteContinueWith listenablefuture CompletableFuture
谢谢!
我正在从我的 Spring Boot 应用程序调用不同的其他 Web 服务。为了提高性能而不是顺序调用它们,我想实现CompletableFuture.supplyAsync()异步执行。
我正在调用的服务,其中一些是内部的,一些是外部的。因此,对于内部,我直接调用它们的 api 接口,这些接口存在于 Maven 依赖项中,而对于外部接口,我正在使用javax.ws.rs.client.WebTarget
在实现调用时,CompletableFuture.supplyAsync()内部的客户端类会成功执行,而具有依赖关系的客户端类则会WebTarget因未创建 bean 而@Inject失败。scopedTarget.XXService_nameXX_wt
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.charityserv_wt': Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [javax.ws.rs.client.WebTarget]: Factory method 'target' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.coreContext': Scope 'request' is not active for the current thread
Run Code Online (Sandbox Code Playgroud)
调用代码:
CompletableFuture<SearchResult> future = CompletableFuture.supplyAsync(() -> searchServiceClientImpl.getsearchSearchRequest(encryptedAccountNumber));
Run Code Online (Sandbox Code Playgroud)
服务客户端类:
@Component
public class …Run Code Online (Sandbox Code Playgroud) 我需要在Java 8中创建一个异步,非阻塞的任务,我想使用CompletableFutures,但是我不确定它是否满足我的需求。
为了简化这种情况,假设我们有一个API,可以为用户检索一些数据,但同时希望启动一个单独的任务来执行一些操作。我不需要也不想等待此单独的任务完成,我想立即将响应发送给用户。模拟代码中的一个示例:
public Response doSomething(params) {
Object data = retrieveSomeData(params);
// I don't want to wait for this to finish, I don't care if it succeeds or not
doSomethingNoWait(data);
return new Response(data);
}
Run Code Online (Sandbox Code Playgroud)
我正在看CompletableFutures,像这样:
CompletableFuture.supplyAsync(this::doSomethingNoWait)
.thenApply(this::logSomeMessage);
Run Code Online (Sandbox Code Playgroud)
我想知道那是正确的方法吗?在doSomethingNoWait完成必须做的事情之前,响应会返回给用户吗?
谢谢!
我有一个异步 API,它本质上是通过分页返回结果
public CompletableFuture<Response> getNext(int startFrom);
Run Code Online (Sandbox Code Playgroud)
每个Response对象都包含一个偏移量列表startFrom和一个标志,该标志指示是否还有更多剩余元素,因此是否getNext()需要发出另一个请求。
我想编写一个方法来遍历所有页面并检索所有偏移量。我可以像这样以同步方式编写它
int startFrom = 0;
List<Integer> offsets = new ArrayList<>();
for (;;) {
CompletableFuture<Response> future = getNext(startFrom);
Response response = future.get(); // an exception stops everything
if (response.getOffsets().isEmpty()) {
break; // we're done
}
offsets.addAll(response.getOffsets());
if (!response.hasMore()) {
break; // we're done
}
startFrom = getLast(response.getOffsets());
}
Run Code Online (Sandbox Code Playgroud)
换句话说,我们在0处调用getNext()with。startFrom如果抛出异常,我们就会短路整个过程。否则,如果没有偏移,我们就完成。如果有偏移,我们将它们添加到主列表中。如果没有更多的东西需要获取,我们就完成了。否则,我们将重置startFrom为我们获取的最后一个偏移量并重复。
理想情况下,我想在不阻塞CompletableFuture::get()并返回CompletableFuture<List<Integer>>包含所有偏移量的情况下执行此操作。
我怎样才能做到这一点?我如何编写期货来收集结果?
我正在考虑“递归”(实际上不是在执行中,而是在代码中)
private CompletableFuture<List<Integer>> recur(int startFrom, List<Integer> offsets) …Run Code Online (Sandbox Code Playgroud) 我的同事首选的Java 8编码风格一直在链接异步调用,例如
CompletionStage<E> someMethod() {
return doSomething().thenCompose(a -> {
// ...
return b;
}).thenCompose(b -> {
// ...
return c;
}).thenCompose(c -> {
// ...
return d;
}).thenApply(d -> {
// ...
return e;
});
}
Run Code Online (Sandbox Code Playgroud)
我有类似上面的内容,但有一个额外的挑战:我需要回忆一些lambda中检索到的值,在后来的lambda中.例如,
CompletionStage<E> someMethod() {
return doSomething().thenCompose(a -> {
// ...
Foo foo = fooDAO.getFoos(a);
// ...
return b;
}).thenCompose(b -> {
// ...
return c;
}).thenCompose(c -> {
// ...
Bar bar = barDAO.getBars(foo);
// ...
return d;
}).thenApply(d -> {
// ... …Run Code Online (Sandbox Code Playgroud) 我有以下一段java代码:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of Future 3";
});
boolean isdone = CompletableFuture.allOf(future1, future2, future3).isDone();
if …Run Code Online (Sandbox Code Playgroud) 我是 Junit 新手,最近遇到了这个问题。无论我在代码中使用 CompletableFuture,我都无法编写测试用例。就像下面的Java文件一样
更新
审计服务.java
@Autowired
Executor existingThreadPool;
@Override
public void auditData(List<ErrorDetails> alertList) {
CompletableFuture.runAsync(() -> {
if (alertList.isEmpty())
//privateMethodCall1
else
//privateMethodCall2
}, existingThreadPool);
}
Run Code Online (Sandbox Code Playgroud)
我点击此链接并尝试了下面的解决方案,但仍然出现 NPE for CompletableFuture 类似下面的错误。
审计服务测试.java
@InjectMock
AuditService auditService;
@Mock
private CompletableFuture<Void> completableFuture = null;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
completableFuture = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {}
},Executors.newSingleThreadExecutor());
}
@Test
public void shouldAuditData() {
List<ErrorDetails> alertList = new ArrayList();
auditService.auditData(alertList);
}
Run Code Online (Sandbox Code Playgroud)
错误
java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.screenExecutor(CompletableFuture.java:415)
at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
at …Run Code Online (Sandbox Code Playgroud) 我正在使用Java Future类执行连接Oracle数据库的程序。但是有时查询的速度比预期的要慢。我可以通过future.cancel方法取消未来。
假设我通过future.cancel取消了Future线程。查询将在Oracle中停止执行,还是仅应用程序线程将被停止/取消,Oracle查询将继续在数据库端运行。
我们正在提交很多future,但有些提交速度很慢。
在这种情况下的实际行为是什么。
谢谢
当我从 Java 11 切换到 Java 17(从 Ubuntu 20.04 存储库安装 OpenJDK)后,以下代码不起作用:
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.exception.ExceptionUtils;
public class TestClass {
public static void main(String[] args) {
CompletableFuture<String> streamFuture = CompletableFuture.supplyAsync(() -> {
throw MyException.wrapIfNeeded(new Exception("MyException"));
});
String result = null;
try {
result = streamFuture.get();
} catch (Exception e) {
System.out.println("Exception: " + ExceptionUtils.getMessage(e));
}
System.out.println("Result: " + Objects.toString(result));
}
static class MyException extends RuntimeException {
private static final long serialVersionUID = 3349188601484197015L;
public MyException(Throwable cause) {
super(cause …Run Code Online (Sandbox Code Playgroud)