我一直在学习 Spring Webflux 和反应式编程,但遇到了一个问题,我正在尝试使用 Spring Webclient 解决重试逻辑。我创建了一个客户端并成功调用了一个返回一些 JSON 数据的外部 Web 服务 GET 端点。
当外部服务以503 - Service Unavailable状态响应时,响应包含一个Retry-After标头,其中包含一个值,该值指示在重试请求之前我应该等待多长时间。我想在 Spring Webflux/Reactor 中找到一种方法来告诉 webClient 在 X 周期后重试它的请求,其中 X 是现在和我从响应标头中解析出的 DateTime 之间的差异。
public <T> Mono<T> get(final String url, Class<T> clazz) {
return webClient
.get().uri(url)
.retrieve()
.bodyToMono(clazz);
}
Run Code Online (Sandbox Code Playgroud)
我使用构建器创建了webClient上述方法中使用的变量,并将其作为实例变量存储在类中。
webClientBuilder = WebClient.builder();
webClientBuilder.codecs(clientCodecConfigurer -> {
clientCodecConfigurer.defaultCodecs();
clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
});
webClient = webClientBuilder.build();
Run Code Online (Sandbox Code Playgroud)
我试图理解和使用该类的retryWhen方法Retry,但不知道我是否可以访问或传递那里的响应标头值。
public <T> …Run Code Online (Sandbox Code Playgroud) java project-reactor spring-webflux retrywhen spring-webclient
我正在使用Retrofit 2和RxJava2调用API。如果呼叫失败,在某些情况下(例如,没有Internet连接),我想向用户显示错误对话框,然后让他重试。
当我使用RxJava时,我正在考虑使用它,.retryWhen(...)但是我不知道该怎么做,因为它需要等待用户按下对话框上的按钮。
目前,我显示该对话框,但在用户按下任何按钮之前会重试。另外,我希望在用户按下“取消”时不重试该呼叫。
这是我目前的代码:
private void displayDialog(DialogInterface.OnClickListener positive, DialogInterface.OnClickListener negative) {
AlertDialog.Builder builder = new AlertDialog.Builder(MainActivity.this);
builder.setMessage("Unexpected error, do you want to retry?")
.setPositiveButton("Retry", positive)
.setNegativeButton("Cancel", negative)
.show();
}
private Observable<Boolean> notifyUser() {
final PublishSubject<Boolean> subject = PublishSubject.create();
displayDialog(
(dialogInterface, i) -> subject.onNext(true),
(dialogInterface, i) -> subject.onNext(false)
);
return subject;
}
private void onClick() {
Log.d(TAG, "onClick");
getData()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.retryWhen(attempts -> {
return attempts.zipWith(
notifyUser(),
(throwable, res) -> res);
})
.subscribe(
s -> {
Log.d(TAG, "success"); …Run Code Online (Sandbox Code Playgroud) 我正在尝试从 Angular 拦截器捕获 http 请求的错误,并在重试 503 和 504 响应n时间时将401 处理为注销。
这是我的 http 拦截器:
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
return next.handle(req).pipe(
catchError(error => {
if (error.status === 401) {
this.authenticationService.logout();
this.router.navigate(['login']);
}
return throwError(error);
}),
retryWhen(errors => errors
.pipe(
concatMap((error, count) => {
if (count < 2 && (error.status == 503 || error.status == 504)) {
return of(error.status);
}
return throwError(error);
}),
delay(500)
)
)
);
}
Run Code Online (Sandbox Code Playgroud)
我 100% 确定这段代码在我编写时有效,因为我对其进行了多次测试,但现在它给了我:
UnsubscriptionErrorImpl
{message: "1 errors occurred during unsubscription:?1) TypeError: …Run Code Online (Sandbox Code Playgroud) 在单元测试重试期间,模拟的响应似乎已缓存,或者很可能我做错了什么。
我正在尝试请求某些内容,如果发生错误,则重试两次,延迟 1 秒。
public Mono<Object> someMethod(String someParam) {
return someInjectedService.doSomething(someParam)
.doOnError(ex -> System.out.println(ex + ": " + System.currentTimeMillis()))
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1)).filter(ex -> ex instanceof SomeCustomException))
.doOnSuccess(result -> doSomethingOnSuccess(result));
}
Run Code Online (Sandbox Code Playgroud)
我的测试:
@Test
void testshouldRequestThrice_whenErrorOccurs() {
// Given
String someParam = "testParam";
when(someInjectedService.doSomething(someParam))
.thenReturn(Mono.error(new SomeCustomException("SomeCustomException"))) // 1st response
.thenReturn(Mono.error(new SomeCustomException("SomeCustomException"))) // 2nd response
.thenReturn(Mono.just("SomeValidResponse")); // 3rd valid response
// When
var result = testService.someMethod(someParam).block();
// Then
// Initial request, followed by two retries
verify(someInjectedService, times(3)).doSomething(someParam);
}
Run Code Online (Sandbox Code Playgroud)
这someInjectedService是一个模拟。我的计划是返回异常两次,并在第三次请求时返回有效响应。但我得到的是:
org.mockito.exceptions.verification.TooFewActualInitations: someInjectedService.doSomething("testParam");
通缉 …
我正在调用 API 来获取随机详细信息。问题是,有时我会收到 502 错误作为网关错误,并且它也可能由于网络连接错误而中断。下面是我的API调用代码
// COMPONENT API CALL SUBSCRIBE
this._service.post('randomAPI/getdetails', filters).subscribe((response: any) => {
this.itemList = response;
});
// SHARED SERVICE
post<T>(url: string, body: any): Observable<T> {
return this.httpClient.post<T>(url, body);
}
Run Code Online (Sandbox Code Playgroud)
每当我收到 500 或 502 服务器错误时,我都会使用拦截器路由到错误页面以通知用户服务器问题。
相反,如果失败,我可以让 API 在组件级别或拦截器级别再尝试一次,然后路由到错误页面吗?
// INTERCEPTOR
intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
return next.handle(request).pipe(catchError(error => {
if (error.status === 401 || error.status === 400 || error.status === 403) {
this.router.navigateByUrl('abort-access', { replaceUrl: true });
} else if (error.status === 500 || error.status === 502) …Run Code Online (Sandbox Code Playgroud) 我正在尝试对自定义 RxJS 运算符进行单元测试。该运算符非常简单,它使用 RetryWhen 重试失败的 HTTP 请求,但有延迟,并且仅当 HTTP Error 在 500 范围内时才会重试。使用 jasmine,这是在 Angular 应用程序中。
我看过这个:
不幸的是,更新 SpyOn 调用似乎不会更改连续重试时返回的可观察值。每次重试时,都会使用原始间谍值重试。
我还查看了一堆 rxjs 大理石示例,但似乎都不起作用。我不确定这里是否可以使用 rxjs 弹珠,因为(AFAIK)无法模拟首先提交错误的可观察值,然后在后续尝试中提交成功的可观察值的情况。
该代码基本上是此代码的克隆: https: //blog.angularindepth.com/retry-failed-http-requests-in-angular-f5959d486294
export function delayedRetry(delayMS: number, maxRetry) {
let retries = maxRetry;
return (src: Observable<any>) =>
src.pipe(
retryWhen((errors: Observable<any>) => errors.pipe(
delay(delayMS),
mergeMap(error =>
(retries-- > 0 && error.status >= 500) ? of(error) : throwError(error))
))
);
}
Run Code Online (Sandbox Code Playgroud)
我希望能够证明它可以订阅一个可观察的对象,该可观察对象在第一次尝试时返回错误,但随后返回成功的响应。最终订阅应该显示 observable 发出的任何成功值。
预先感谢您提供任何见解。
retrywhen ×7
angular ×3
rxjs ×3
http ×2
java ×2
android ×1
angular5 ×1
interceptor ×1
rx-java2 ×1
typescript ×1
unit-testing ×1
webflux ×1