我在反序列化响应期间遇到问题。假设我使用 webclient 来自第三方的响应。
Response :
{
"name":"FirstName",
"type":"Steel",
"Fee":{
"id":"1234",
"name":"FeeFirstName"
},
"address":"2nd Street"
}
Run Code Online (Sandbox Code Playgroud)
这就是我的 pojo 类的样子
public class Fee{} //generic OR empty class
public class Foo{
private String name;
private String type;
private Fee fee;
private String address;
}
Run Code Online (Sandbox Code Playgroud)
我的网络客户端获取响应代码:
@Autowired
private WebClient fooWebClient;
public Foo getFoo()
{
try{
return fooWebClient.get()
.uri(uriBuilder -> uriBuilder.path("/foo/fee").build("123"))
.header(HttpHeaders.CONTENT_TYPE,MediaType.APPLICATION_JSON_VALUE)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Foo.class)
.block();
}catch(Exception e){throw new ApiClientException(e.getMessage());}
}
Run Code Online (Sandbox Code Playgroud)
上面的 webclient getFoo() 代码没有给我完整的响应,费用是空白的,说明“类没有字段”。其余的值都会正确响应。费用需要为空,因为任何其他对象也可以出现。
请让我知道如何反序列化整个响应。
我已经基于该spring-boot-starter-webflux工件实现了一个网络客户端。
代码:
\n// create client bean to use throughout services\n@Bean\npublic WebClient geoserverWebClient() {\n // to not fall prone to DataBufferLimitException\n final int size = 16 * 1024 * 1024;\n final ExchangeStrategies s\xce\xa9trategies = ExchangeStrategies.builder()\n .codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size))\n .build();\n \n return WebClient.builder()\n .exchangeStrategies(strategies)\n .baseUrl(geoserverURL) \n .defaultHeader(HttpHeaders.AUTHORIZATION, "Basic " + Base64Utils.encodeToString(geoserverBasicAuth.getBytes()))\n .build();\n}\nRun Code Online (Sandbox Code Playgroud)\n到目前为止,我在使用它时没有遇到任何问题,例如:
\n// send getMap WMS to geoserver\npublic Mono<byte[]> getMap(String requestURL){\n\n return geoserverWebClient\n .get()\n .uri(requestURL)\n .retrieve()\n .bodyToMono(byte[].class);\n}\nRun Code Online (Sandbox Code Playgroud)\n但是,如果我onStatus向其中添加该方法以检查 HTTP 错误,则会收到错误:“WebClient.ResponseSpec 类型中的方法 …
编辑:这里https://github.com/wujek-srujek/reactor-retry-test是包含所有代码的存储库。
我有以下 SpringWebClient代码来 POST 到远程服务器(为简洁起见,没有导入的 Kotlin 代码):
private val logger = KotlinLogging.logger {}
@Component
class Client(private val webClient: WebClient) {
companion object {
const val maxRetries = 2L
val firstBackOff = Duration.ofSeconds(5L)
val maxBackOff = Duration.ofSeconds(20L)
}
fun send(uri: URI, data: Data): Mono<Void> {
return webClient
.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(data)
.retrieve()
.toBodilessEntity()
.doOnSubscribe {
logger.info { "Calling backend, uri: $uri" }
}
.retryExponentialBackoff(maxRetries, firstBackOff, maxBackOff, jitter = false) {
logger.debug { "Call to $uri failed, …Run Code Online (Sandbox Code Playgroud) 我正在使用 Flux 来构建我的反应式管道。在管道中,我需要调用 3 个不同的外部系统 REST API,它们的访问速率非常严格。如果我违反了每秒速率阈值,我将受到指数级的限制。每个系统都有自己的阈值。
我正在使用 Spring WebClient 进行 REST API 调用;在 3 个 API 中,其中 2 个是 GET,1 个是 POST。
在我的反应器管道中,WebClient 被包裹在 flatMap 中以执行 API 调用,如下面的代码:
WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string
Flux.generator(generator) // Generator pushes the elements from source 1 at a time
// make call to 1st API …Run Code Online (Sandbox Code Playgroud) spring-boot project-reactor reactive-streams spring-webflux spring-webclient
想问一个关于两种技术的问题。
我们首先从一个必须调用其他第三方 rest API 的应用程序开始,因此,我们在 SpringBoot Webflux 项目中使用了 Webflux WebClient。到目前为止一切顺利,我们有一个成功的应用程序有一段时间了。
然后第三方应用程序(不是我们的)开始变得不稳定,有时会无法满足我们的请求。我们必须实现某种重试逻辑。执行重试逻辑后,如WebClient重试,业务流程正常。我们主要是直接从框架中提取逻辑。例如,最近在 SpringOne 上@simon-baslé、Cancel、Retry 和 Timeouts 的演讲给出了许多工作示例。
.retryWhen(backoff(5, Duration.ofMillis(10).maxbackOff(Duration.ofSeconds(1)).jitter(0.4)).timeout(Duration.ofSeconds(5)
Run Code Online (Sandbox Code Playgroud)
另一方面,最近有越来越多的应用程序转向断路器模式。由 Resilience4J 支持的 Spring Cloud Circuit Breaker 项目是一个流行的实现,它使用 Resilience4J 来实现诸如断路器、隔板和重试等模式。
因此,我有一个问题,在重试方面使用/组合两者是否有好处?
将两者放在一起有什么好处吗?有什么缺点吗?
或者只有两者之一就足够了,在这种情况下,请选择哪一个?为什么?
谢谢
circuit-breaker spring-webflux resilience4j spring-webclient
我正在使用 Spring WebClient 调用 REST API。我想根据响应抛出错误。例如,如果 body 出现错误 (400)
{"error": "error message1 "}
Run Code Online (Sandbox Code Playgroud)
然后我想抛出一个带有“error message1”的错误。如果主体有错误(400),则同样的方法
{"error_code": "100020"}
Run Code Online (Sandbox Code Playgroud)
然后我想抛出一个 error_cde 100020 的错误。我想以非阻塞的方式做到这一点。
public Mono<Response> webclient1(...) {
webClient.post().uri(createUserUri).header(CONTENT_TYPE, APPLICATION_JSON)
.body(Mono.just(request), Request.class).retrieve()
.onStatus(HttpStatus::isError, clientResponse -> {
//Error Handling
}).bodyToMono(Response.class);
}
Run Code Online (Sandbox Code Playgroud) 首先,我是 Spring Webflux 的新手,并尝试在设置反应式 Spring Boot 项目时进行 POC。我有一个用例,我需要将检索到的实体类(PartyDTO)转换为 Mono 对象(Person :是没有构造函数的第三方业务对象,我无法修改它)。我用谷歌搜索但无法找到与我的用例匹配的答案。
第三方对象:
public class Person {
// no constructors
private Integer custId;
private String fullname;
private LocalDate date;
//
getters and setters
}
Run Code Online (Sandbox Code Playgroud)
我的课程如下:
@Table("party")
public class PartyDTO {
@Id
private Integer party_id;
private String name;
private LocalDate start_date;
}
Run Code Online (Sandbox Code Playgroud)
调用我的存储库的服务类。
@Service
public class ServiceImpl{
@Override
public Mono<Person> getParty(String partyId) {
return
partyRepository.findById(Integer.parseInt(partyId)).flatMap(//mapper to convert PartyDTO to Person goes here);
}
}
Run Code Online (Sandbox Code Playgroud)
我尝试将平面地图与我的自定义映射器一起使用,如上所示,但它不起作用。有人可以建议我如何以非阻塞方式实现这一点(如果第三方 bean 映射器支持非阻塞方法,它也很好)
我面临 Webclient 和 mockito 的问题下面是我的服务代码:
public Flux<Config> getConfigs(String param1, String param2) {
MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
if(!StringUtils.isEmpty(param2)) {
queryParams.add("name", param2);
}
String path = "api/v1/config";
return webClient.get().uri(uriBuilder -> uriBuilder.path(path)
.queryParams(queryParams)
.build())
.retrieve().bodyToFlux(Config.class)
.doOnError(MyRuntimeException::throwError);
}
Run Code Online (Sandbox Code Playgroud)
我尝试的测试用例失败并出现以下错误:
Strict stubbing argument mismatch. Please check:
- this invocation of 'uri' method:
requestHeadersUriSpec.uri(
com.rs.para.conf.service.ConfigServiceImpl$$Lambda$309/1334433160@3925299f
);
Run Code Online (Sandbox Code Playgroud)
测试用例代码:
@Test
public void testConfig() {
List<Config> configs = new ArrayList<>();
doReturn(requestHeadersUriMock).when(webClientMock).get();
doReturn(requestHeadersMock).when(requestHeadersUriMock)
.uri(anyString());
doReturn(responseMock).when(requestHeadersMock).retrieve();
doReturn(Flux.fromIterable(configs)).when(responseMock).bodyToFlux(Config.class);
Flux<Config> configFlux = configService.getConfigs("100005", "test");
}
Run Code Online (Sandbox Code Playgroud)
我可以在没有查询参数的情况下运行正常的 GET,但是当我尝试运行这个具有查询参数的测试时,它会给我错误 PS:我不想使用 mockwebserver
我需要使用如下所示的 URL 调用外部服务...
获取https://api.staging.xxxx.com/locations?where={"account":"bob"}
这不是我的服务,我对其没有影响力,我目前的代码库正在使用 Spring WebClient。
WebClient.create("https://api.staging.xxxx.com/")
.get()
.uri(uriBuilder -> uriBuilder.path("locations?where={'account':'bob'}").build())
Run Code Online (Sandbox Code Playgroud)
由于 WebClient 看到 { 括号,它会尝试将值注入到 URL 中,但我不希望这样做。
谁能建议我如何使用 Spring WebClient?
否则我将恢复到 OKHttp 或另一个基本客户端来发送此请求。
我是反应式编程的新手,遇到了这个问题:
[
{
"customerDTO": {
"scanAvailable": true
},
"bankAccountDTOs": {
"scanAvailable": true,
"prefetch": -1
}
}
]
Run Code Online (Sandbox Code Playgroud)
数据传输对象:
public class ResponseClientDTO {
private Mono<CustomerDTO> customerDTO;
private Flux<BankAccountDTO> bankAccountDTOs;
}
Run Code Online (Sandbox Code Playgroud)
服务:
public Flux<ResponseClientDTO> getCustomerWithBankAccounts(String customerId){
Flux<BankAccountDTO> bankAccounts = webClient
.get()
.uri(uriBuilder ->
uriBuilder.path("customers")
.queryParam("customerId", customerId).build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(BankAccountDTO.class);
Mono<CustomerDTO> cMono = findOne(customerId);
ResponseClientDTO responseClientDTO = new ResponseClientDTO();
responseClientDTO.setBankAccountDTOs(bankAccounts);
responseClientDTO.setCustomerDTO(cMono);
return Flux.just(responseClientDTO);
}
Run Code Online (Sandbox Code Playgroud)
我从另一个 API 查询端点,它返回一个Flux<BankAccounts>. 我想找到客户所有的银行账户。
我正在使用spring webclient发出带有{comment_count}的url的Facebook图形api请求
但是,得到这个例外
java.lang.IllegalArgumentException: Not enough variable values available to expand reactive spring
Run Code Online (Sandbox Code Playgroud)
代码段:
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@Component
public class Stackoverflow {
WebClient client = WebClient.create();
public Mono<Post> fetchPost(String url) {
// Url contains "comments{comment_count}"
return client.get().uri(url).retrieve()
.bodyToMono(Post.class);
}
}
Run Code Online (Sandbox Code Playgroud)
我知道resttemplate的解决方案,但是我需要使用spring webclient。
illegalargumentexception facebook-graph-api spring-boot spring-webflux spring-webclient
我在尝试缓存Mono由WebClient. 代码是这样的:
public Mono<Token> authenticate() {
return cachedTokenMono = cachedTokenMono
.switchIfEmpty(
Mono.defer(() ->
getToken())
.cache(token ->
Duration.between(Instant.now(), token.getExpires().toInstant()),
(Throwable throwable) -> Duration.ZERO,
() -> Duration.ZERO));
}
Run Code Online (Sandbox Code Playgroud)
目的是Mono用于接收 a 的Token被缓存,直到令牌过期。令牌过期后,缓存Mono变为空并请求新令牌。这按预期工作,但不幸的switchIfEmpty()是实际上并没有“切换”,而是包装了源代码Mono。结果,随着越来越多的包装SwitchIfEmptyMono被创建,这会造成内存泄漏。在这种情况下,正确的模式是什么?有没有办法Mono用新的代替空的?
java reactive-programming project-reactor spring-webflux spring-webclient
spring-webclient ×12
spring ×5
java ×4
spring-boot ×3
http-error ×1
junit ×1
mockito ×1
resilience4j ×1
undefined ×1