xma*_*s79 17 java concurrency asynchronous exception java-8
我需要在我正在处理的异步框架中提交任务,但我需要捕获异常,并在"中止"之前多次重试相同的任务.
我正在使用的代码是:
int retries = 0;
public CompletableFuture<Result> executeActionAsync() {
// Execute the action async and get the future
CompletableFuture<Result> f = executeMycustomActionHere();
// If the future completes with exception:
f.exceptionally(ex -> {
retries++; // Increment the retry count
if (retries < MAX_RETRIES)
return executeActionAsync(); // <--- Submit one more time
// Abort with a null value
return null;
});
// Return the future
return f;
}
Run Code Online (Sandbox Code Playgroud)
这当前不能编译,因为lambda的返回类型是错误的:它期望a Result,但executeActionAsync返回a CompletableFuture<Result>.
如何实现这个完全异步重试逻辑?
Hol*_*ger 13
链接后续重试可以是直截了当的:
public CompletableFuture<Result> executeActionAsync() {
CompletableFuture<Result> f=executeMycustomActionHere();
for(int i=0; i<MAX_RETRIES; i++) {
f=f.exceptionally(t -> executeMycustomActionHere().join());
}
return f;
}
Run Code Online (Sandbox Code Playgroud)
阅读下面的缺点
这只是链接尽可能多的重试,因为这些后续阶段在非例外情况下不会做任何事情.
一个缺点是,如果第一次尝试立即失败,那么f在第一个exceptionally处理程序被链接时异常已经完成,操作将由调用线程调用,完全消除请求的异步性质.通常,join()可能会阻塞一个线程(默认执行程序将启动一个新的补偿线程,但仍然不鼓励它).不幸的是,没有,一个exceptionallyAsync或exceptionallyCompose方法.
一个没有调用的解决方案join()就是
public CompletableFuture<Result> executeActionAsync() {
CompletableFuture<Result> f=executeMycustomActionHere();
for(int i=0; i<MAX_RETRIES; i++) {
f=f.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> executeMycustomActionHere())
.thenCompose(Function.identity());
}
return f;
}
Run Code Online (Sandbox Code Playgroud)
演示如何将"组合"和"特殊"处理程序结合起来.
此外,如果所有重试都失败,则仅报告最后一个异常.更好的解决方案应报告第一个异常,随后将重试添加为抑制异常.如Gili的回答所示,这种解决方案可以通过链接递归调用来构建,但是,为了将这个想法用于异常处理,我们必须使用以下步骤来组合"compose"和"exceptionally":
public CompletableFuture<Result> executeActionAsync() {
return executeMycustomActionHere()
.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> retry(t, 0))
.thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
return executeMycustomActionHere()
.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
.thenCompose(Function.identity());
}
Run Code Online (Sandbox Code Playgroud)
CompletableFuture.failedFuture 是一个Java 9方法,但如果需要,将Java 8兼容的backport添加到代码中将是微不足道的:
public static <T> CompletableFuture<T> failedFuture(Throwable t) {
final CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(t);
return cf;
}
Run Code Online (Sandbox Code Playgroud)
我建议不要使用自己的重试逻辑,而是使用经过验证的库,例如failsafe,它内置了对期货的支持(并且似乎比guava-retrying更受欢迎)。对于您的示例,它看起来像:
private static RetryPolicy retryPolicy = new RetryPolicy()
.withMaxRetries(MAX_RETRIES);
public CompletableFuture<Result> executeActionAsync() {
return Failsafe.with(retryPolicy)
.with(executor)
.withFallback(null)
.future(this::executeMycustomActionHere);
}
Run Code Online (Sandbox Code Playgroud)
也许您应该避免.withFallback(null)并让返回的未来.get()方法抛出结果异常,以便您的方法的调用者可以专门处理它,但这是您必须做出的设计决定。
其他需要考虑的事情包括是否应该立即重试或在两次尝试之间等待一段时间、任何类型的递归退避(当您调用可能关闭的 Web 服务时很有用),以及是否存在特定的异常值得重试(例如,如果方法的参数无效)。
我想我成功了.这是我创建的示例类和测试代码:
public class RetriableTask
{
protected static final int MAX_RETRIES = 10;
protected int retries = 0;
protected int n = 0;
protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();
public RetriableTask(int number) {
n = number;
}
public CompletableFuture<Integer> executeAsync() {
// Create a failure within variable timeout
Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);
// Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts.
// In real application this should be a real future
final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
if (n > 5 && retries > 5)
taskFuture.complete(retries * n);
// Attach the failure future to the task future, and perform a check on completion
taskFuture.applyToEither(timeoutFuture, Function.identity())
.whenCompleteAsync((result, exception) -> {
if (exception == null) {
future.complete(result);
} else {
retries++;
if (retries >= MAX_RETRIES) {
future.completeExceptionally(exception);
} else {
executeAsync();
}
}
});
// Return the future
return future;
}
}
Run Code Online (Sandbox Code Playgroud)
int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
tasks.add(new RetriableTask(i));
}
System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
futures.add(tasks.get(i).executeAsync());
}
System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
try {
CompletableFuture<Integer> future = futures.get(i);
int result = future.get();
System.out.println(i + " result is " + result);
} catch (Exception ex) {
System.out.println(i + " I got exception!");
}
}
System.out.println("Done waiting...");
Run Code Online (Sandbox Code Playgroud)
generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...
Run Code Online (Sandbox Code Playgroud)
主要想法和一些胶水代码(failAfter功能)来自这里.
欢迎任何其他建议或改进.
工具类:
public class RetryUtil {
public static <R> CompletableFuture<R> retry(Supplier<CompletableFuture<R>> supplier, int maxRetries) {
CompletableFuture<R> f = supplier.get();
for(int i=0; i<maxRetries; i++) {
f=f.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> {
System.out.println("retry for: "+t.getMessage());
return supplier.get();
})
.thenCompose(Function.identity());
}
return f;
}
}
Run Code Online (Sandbox Code Playgroud)
用法:
public CompletableFuture<String> lucky(){
return CompletableFuture.supplyAsync(()->{
double luckNum = Math.random();
double luckEnough = 0.6;
if(luckNum < luckEnough){
throw new RuntimeException("not luck enough: " + luckNum);
}
return "I'm lucky: "+luckNum;
});
}
@Test
public void testRetry(){
CompletableFuture<String> retry = RetryUtil.retry(this::lucky, 10);
System.out.println("async check");
String join = retry.join();
System.out.println("lucky? "+join);
}
Run Code Online (Sandbox Code Playgroud)
输出
async check
retry for: java.lang.RuntimeException: not luck enough: 0.412296354211683
retry for: java.lang.RuntimeException: not luck enough: 0.4099777199676573
lucky? I'm lucky: 0.8059089479049389
Run Code Online (Sandbox Code Playgroud)
我最近使用guava-retrying库解决了类似的问题.
Callable<Result> callable = new Callable<Result>() {
public Result call() throws Exception {
return executeMycustomActionHere();
}
};
Retryer<Boolean> retryer = RetryerBuilder.<Result>newBuilder()
.retryIfResult(Predicates.<Result>isNull())
.retryIfExceptionOfType(IOException.class)
.retryIfRuntimeException()
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
.build();
CompletableFuture.supplyAsync( () -> {
try {
retryer.call(callable);
} catch (RetryException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
12733 次 |
| 最近记录: |