我正在使用 Spring 5 WebClient 从 REST api 重复获取正在运行的进程的某些状态。
在这里的帮助下,我现在来到了这个解决方案:
webClient.get().uri(...).retrieve.bodyToMono(State.class)
.repeat()
.skipUntil(state -> stateFinished())
.limitRequest(1)
.subscribe(state -> {...});
Run Code Online (Sandbox Code Playgroud)
虽然这有效,但 get 请求以非常高的速率被触发。将请求速率限制为每秒 1 个请求的正确方法是什么?
我尝试使用,delayElements(Duration.ofSeconds(1))但这只会延迟结果,而不是请求本身。
我在reative API spring boot应用程序中使用resilience4j进行容错。我可以看到,即使 Mono 返回错误,所有事件都被视为成功。
服务层
@Override
@CircuitBreaker(name = "member-service")
public Mono<String> getMemberInfo(String memberId) {
return wsClient.getMemberInfo(memberId);
// This call will return WSException whenever there is 4XX or 5XX error
}
Run Code Online (Sandbox Code Playgroud)
application.yml配置
resilience4j.circuitbreaker:
backends:
member-service:
ringBufferSizeInClosedState: 1
ringBufferSizeInHalfOpenState: 2
waitInterval: 10000
failureRateThreshold: 75
registerHealthIndicator: true
recordExceptions:
- com.xx.WSException
ignoreExceptions:
- com.xxx.WSClientException
Run Code Online (Sandbox Code Playgroud)
我故意更改了 URI 路径,以便 WebClient 始终返回 404 错误,从而引发 WSException。当我看到下面的端点时,类型总是成功。我错过了什么?
http://localhost:8088/circuitbreaker-events/member-service
{
"circuitBreakerEvents": [
{
"circuitBreakerName": "member-service",
"type": "SUCCESS",
"creationTime": "2019-02-18T21:56:21.588+05:30[Asia/Calcutta]",
"durationInMs": 6
},
{
"circuitBreakerName": "member-service",
"type": "SUCCESS", …Run Code Online (Sandbox Code Playgroud) 我尝试使用 Spring 5 下载 PDF 文件。以下是我的代码:
@RequestMapping(path = "/pdf", method = { RequestMethod.POST }, produces = MediaType.APPLICATION_PDF_VALUE)
public Mono<ResponseEntity<Resource>> getPDF(ServerHttpRequest httpRequest)
{
File file = new File(filepath);
ResponseEntity<Resource> resource = getResource(file);
return Mono.justOrEmpty(resource);
}
public ResponseEntity<Resource> getResource(File file) {
final InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
HttpHeaders headers = new HttpHeaders();
headers.add(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=" + file.getName());
headers.add("Cache-Control", "no-cache, no-store, must-revalidate");
headers.add("Pragma", "no-cache");
headers.add("Expires", "0");
return ResponseEntity.ok().headers(headers).contentType(MediaType.APPLICATION_PDF).contentLength(file.length()).body(new InputStreamResource(inputStream));
}
Run Code Online (Sandbox Code Playgroud)
但我收到以下异常:
java.lang.NoSuchMethodError: reactor.core.publisher.Flux.doOnDiscard(Ljava/lang/Class;Ljava/util/function/Consumer;)Lreactor/core/publisher/Flux;
在 org.springframework.core.io.buffer.DataBufferUtils.readByteChannel(DataBufferUtils.java:105) 在 org.springframework.core.io.buffer.DataBufferUtils.read(DataBufferUtils.java:202) 在 org.springframework.core.io .buffer.DataBufferUtils.read(DataBufferUtils.java:170) at org.springframework.core.codec.ResourceEncoder.encode(ResourceEncoder.java:76) at …
我在 Vert.x 应用程序中使用了一个库,它返回Project Reactor类型Mono。
我有一个 Verticle 接收这种反应类型,并打算通过事件总线将内容发送到另一个 Verticle:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
@Override
public void start() throws Exception
{
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body()); …Run Code Online (Sandbox Code Playgroud) java reactive-programming vert.x project-reactor reactive-streams
我目前正在使用ExchangeFilterFunction记录进入ClientRequest实例内的所有标头,我正在访问它们做request.headers().
执行我的过滤器后,HttpClient下面会添加某些标头,例如那个标头Accept-Encoding,因此不会被记录,因为它们永远不会被添加到ClientRequest实例中。
我的过滤器看起来像这样:
public class WebClientLoggingFilter implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(final ClientRequest clientRequest, final ExchangeFunction next) {
return Mono.just(clientRequest)
.doOnNext(request -> log(request.headers().toString()))
.flatMap(next::exchange)
.doOnNext(clientResponse -> logData(clientRequestData, message, clientResponse));
}
}
Run Code Online (Sandbox Code Playgroud)
此过滤器将所有内容记录在ClientRequest标头中,但随后HttpClient会发挥其魔力,即使在响应返回之后,它也永远不会到达 ClientRequest。来自 Netty的代码示例。
有没有其他方法可以进行日志记录,以便我可以访问真正通过网络发送的内容?
背景
我正在尝试使用Spring Project Reactor 3.3.0 版实现类似于简单的非阻塞速率限制器的东西。例如,要将数量限制为每秒 100 个请求,我使用以下实现:
myFlux
.bufferTimeout(100, Duration.ofSeconds(1))
.delayElements(Duration.ofSeconds(1))
..
Run Code Online (Sandbox Code Playgroud)
这适用于我的用例,但如果订阅者没有跟上myFlux发布者的速度,它会(正确地)抛出OverflowException:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxLift] :
reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
Run Code Online (Sandbox Code Playgroud)
在我的情况下,重要的是所有元素都被订阅者消耗,因此例如降低背压 ( onBackpressureDrop()) 是不可接受的。
题
有没有办法,而不是在背压下丢弃元素,只是暂停消息的发布,直到订阅者赶上?在我的情况下myFlux,发布了一个有限但大量的元素,这些元素持久存在于持久数据库中,因此恕我直言,不应该要求删除元素。
使用 Spring Boot 学习 Reactor。
使用示例 API:
https://jsonplaceholder.typicode.com/todos/1
{
"userId": 1,
"id": 1,
"title": "delectus aut autem",
"completed": false
}
Run Code Online (Sandbox Code Playgroud)
想要将上面的映射到一个对象(定义一个 pojo SingleUser)并打印输出。
private WebClient webClient = WebClient.create("https://jsonplaceholder.typicode.com");
private Mono<ClientResponse> responseMono = webClient.get()
.uri("/todos/1")
.accept(MediaType.APPLICATION_JSON)
.exchange();
public String getResult() {
return ">> result = " + responseMono.flatMap(res -> res.bodyToMono(String.class)).block();
}
Run Code Online (Sandbox Code Playgroud)
使用上述内容时..结果是:
>> result = {
"userId": 1,
"id": 1,
"title": "delectus aut autem",
"completed": false
}
Run Code Online (Sandbox Code Playgroud)
使用 Flux 时如何迭代和打印所有值,如下所示?
public Flux<SingleUser> listUsers1() {
return webClient.get()
.uri("/todos/1")
.retrieve()
.bodyToFlux(SingleUser.class);
} …Run Code Online (Sandbox Code Playgroud) 在使用项目反应器库的 Java 响应式编程期间,我偶然发现了一种模式,我想知道是否有开箱即用的支持?
所以我想要下面的代码:
Mono.just("hello")
.flatMap(hello -> reactiveAction(hello).thenReturn(hello))
..
.;
Run Code Online (Sandbox Code Playgroud)
变成类似的东西:
Mono.just("hello")
.coolOperation(this::reactiveAction)
..
.;
Run Code Online (Sandbox Code Playgroud)
我不能使用 doOnNext 因为我想在 reactAction 中做的不是副作用。和反应动作是:
Mono<Integer> reactiveAction(String text){
return ....
}
Run Code Online (Sandbox Code Playgroud) 下面的代码并行执行所有 Web 请求 (webClient),不考虑我输入的限制parallel(5)。
Flux.fromIterable(dataListWithHundredsElements)
.parallel(5).runOn(Schedulers.boundedElastic())
.flatMap(element ->
webClient.post().
.bodyValue(element)
.retrieve()
.bodyToMono(String.class)
.doOnError(err -> element.setError(Utils.toString(err)))
.doOnSuccess(r -> element.setResponse(r))
)
.sequential()
.onErrorContinue((e, v) -> {})
.doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
.subscribe();
Run Code Online (Sandbox Code Playgroud)
我想知道是否可以根据中指定的值执行请求parallel(5)以及如何最好地执行请求?
一个细节,此代码是一个 Spring MVC 应用程序,我正在向外部服务发出请求。
更新 01
实际上 Flux 创建了 5 个线程,但是,所有请求(WebClient Mono)都是同时执行的。
我想要的是一次执行 5 个请求,所以当 1 个请求结束时,另一个请求会启动,但任何时候都不应该有 5 个以上的并行请求。
由于 Mono 也是一种响应式类型,在我看来 Flux 的 5 个线程调用它并且没有被阻塞,实际上发生的情况是所有请求都是并行发生的。
更新 02 - 外部服务日志
这是外部服务的日志,大约需要 5 秒才能响应。正如您在下面的日志中看到的,同时有 14 个请求。
2020-05-08 11:53:56.655 INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8 …Run Code Online (Sandbox Code Playgroud) 我正在使用 Flux 来构建我的反应式管道。在管道中,我需要调用 3 个不同的外部系统 REST API,它们的访问速率非常严格。如果我违反了每秒速率阈值,我将受到指数级的限制。每个系统都有自己的阈值。
我正在使用 Spring WebClient 进行 REST API 调用;在 3 个 API 中,其中 2 个是 GET,1 个是 POST。
在我的反应器管道中,WebClient 被包裹在 flatMap 中以执行 API 调用,如下面的代码:
WebClient getApiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getApiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string
Flux.generator(generator) // Generator pushes the elements from source 1 at a time
// make call to 1st API …Run Code Online (Sandbox Code Playgroud) spring-boot project-reactor reactive-streams spring-webflux spring-webclient
project-reactor ×10
spring ×5
spring-boot ×4
java ×3
backpressure ×1
mono ×1
netty ×1
publisher ×1
resilience4j ×1
vert.x ×1