在使用 Spring Project Reactor 延迟背压后重试?

Joh*_*han 1 spring backpressure spring-boot project-reactor

背景

我正在尝试使用Spring Project Reactor 3.3.0 版实现类似于简单的非阻塞速率限制器的东西。例如,要将数量限制为每秒 100 个请求,我使用以下实现:

myFlux
      .bufferTimeout(100, Duration.ofSeconds(1))
      .delayElements(Duration.ofSeconds(1))
      ..
Run Code Online (Sandbox Code Playgroud)

这适用于我的用例,但如果订阅者没有跟上myFlux发布者的速度,它会(正确地)抛出OverflowException

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxLift] :
    reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
Run Code Online (Sandbox Code Playgroud)

在我的情况下,重要的是所有元素都被订阅者消耗,因此例如降低背压 ( onBackpressureDrop()) 是不可接受的。

有没有办法,而不是在背压下丢弃元素,只是暂停消息的发布,直到订阅者赶上?在我的情况下myFlux,发布了一个有限但大量的元素,这些元素持久存在于持久数据库中,因此恕我直言,不应该要求删除元素。

Mar*_*pel 5

bufferTimeout(int maxSize, Duration maxTime)请求无限数量的消息,因此对背压不敏感。这使它不适合您的情况。

在概念层面上,bufferTimeout不能对背压敏感,因为您明确指示发布者为每个经过的持续时间发出一批(即使它是空的)。如果订阅者太慢,这将 - 理所当然地 - 导致溢出。

相反,请尝试:

myFlux
   .delayElements(Duration.ofMillis(10))
   .buffer(100)
Run Code Online (Sandbox Code Playgroud)

或者

myFlux
   .buffer(100)
   .delayElements(Duration.ofSeconds(1))
Run Code Online (Sandbox Code Playgroud)

buffer(int maxSize)请求正确的上游数量 ( request * maxSize),因此对来自订阅者的背压很敏感。