如何将 CircuitBreaker 与 TimeLimiter 和 Bulkhead 结合使用?

faf*_*afl 5 spring-boot spring-webflux resilience4j kotlin-coroutines

我有一个通过 REST 调用依赖项的服务。服务和依赖是微服务架构的一部分,所以我想使用弹性模式。我的目标是:

  • 有一个断路器来保护它在挣扎时的依赖
  • 限制调用可以运行的时间。该服务具有 SLA,并且必须在特定时间进行响应。在超时时,我们使用回退值。
  • 限制对依赖项的并发调用数。通常调用率低,响应快,但我们希望保护依赖免受服务内部的突发和队列请求。

下面是我目前的代码。它有效,但理想情况下我想使用TimeLimiterBulkhead类,因为它们似乎可以协同工作。

我怎样才能更好地写这个?

@Component
class FooService(@Autowired val circuitBreakerRegistry: CircuitBreakerRegistry)
{
    ...

    // State machine to take load off the dependency when slow or unresponsive
    private val circuitBreaker = circuitBreakerRegistry
        .circuitBreaker("fooService")

    // Limit parallel requests to dependency
    private var semaphore = Semaphore(maxParallelRequests)

    // The protected function
    private suspend fun makeHttpCall(customerId: String): Boolean {
        val client = webClientProvider.getCachedWebClient(baseUrl)

        val response = client
            .head()
            .uri("/the/request/url")
            .awaitExchange()

        return when (val status = response.rawStatusCode()) {
            200 -> true
            204 -> false
            else -> throw Exception(
                "Foo service responded with invalid status code: $status"
            )
        }
    }

    // Main function
    suspend fun isFoo(someId: String): Boolean {
        try {
            return circuitBreaker.executeSuspendFunction {
                semaphore.withPermit {
                    try {
                        withTimeout(timeoutMs) {
                            makeHttpCall(someId)
                        }
                    } catch (e: TimeoutCancellationException) {
                        // This exception has to be converted because
                        // the circuit-breaker ignores CancellationException
                        throw Exception("Call to foo service timed out")
                    }
                }
            }
        } catch (e: CallNotPermittedException) {
            logger.error { "Call to foo blocked by circuit breaker" }
        } catch (e: Exception) {
            logger.error { "Exception while calling foo service: ${e.message}" }
        }

        // Fallback
        return true
    }
}
Run Code Online (Sandbox Code Playgroud)

理想情况下,我想写一些类似于文档描述的 Flows 的内容:

// Main function
suspend fun isFoo(someId: String): Boolean {
    return monoOf(makeHttpCall(someId))
        .bulkhead(bulkhead)
        .timeLimiter(timeLimiter)
        .circuitBreaker(circuitBreaker)
}
Run Code Online (Sandbox Code Playgroud)

小智 0

您还可以使用 Resilience4j 的 Bulkhead 代替您自己的 Semaphore 和 Resilience4j 的 TimeLimiter。您可以使用bulkhead.executeSuspendFunction和堆叠您的CircuitBreaker timelimiter.executeSuspendFunction