使用 RabbitMq MessageListenerContainer 的 Reactor Flux 超时

Rya*_*via 8 java spring rabbitmq project-reactor

使用热发布者模型,在云环境中大约有 50% 的时间会发生以下超时异常:

[ERROR] reactor.core.scheduler.Schedulers                  - Scheduler worker in group main failed with an uncaught exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 300000ms (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 300000ms (and no fallback has been configured)
reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289)
reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274)
reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396)
reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
datadog.trace.instrumentation.reactor.core.TracingSubscriber.onNext(TracingSubscriber.java:75)
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
datadog.trace.instrumentation.reactor.core.TracingSubscriber.onNext(TracingSubscriber.java:75)
reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
Run Code Online (Sandbox Code Playgroud)

但是我不能在本地复制这个。我的主要理论是原因发生在通量的创建过程中,并且在Flux和之间发生了一些不好的事情MessageListenerContainer。创作看起来像:

@Bean
public Flux<RabbitEventPublishEnvelope> masterFlux(
        Queue eventQueue,
        ObjectMapper objectMapper,
        MessageListenerContainerFactory messageListenerContainerFactory) {
    log.info("Create a listener for the topic queue: '{}'", eventQueue.getName());
    MessageListenerContainer mlc = messageListenerContainerFactory
            .createDirectMessageListenerContainer(eventQueue.getName());

    log.info("Define the master Flux for event subscriptions on queue '{}'", eventQueue.getName());
    Flux<RabbitEventPublishEnvelope> masterFlux = Flux.create(emitter -> {
        mlc.setupMessageListener(m -> {
            RabbitEventPublishEnvelope payload = null;
            try {
                log.info("Creating payload");
                payload = objectMapper.readValue(m.getBody(), RabbitEventPublishEnvelope.class);
            } catch (IOException e) {
                log.error("Failed to parse RabbitEventPublishEnvelope:\n{}", m.getBody());
                throw new RuntimeException("Failed to parse RabbitEventPublishEnvelope", e);
            } catch (Exception e) {
                log.error("Unhandled exception in Flux.create(): {}", e.getMessage(), e);
            }
            log.info("Emitting payload");
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            log.info("MLC starting");
            mlc.start();
            log.info("Start recipe event subscription");
        });
        emitter.onDispose(() -> {
            // WARNING: DO NOT issue `mlc.stop();` here or it will cause responses to hang.
            // The main reason this callback handler is implemented is to document what will break our implementation.
            log.info("Done with recipe event subscription");
        });
    });

    log.info("Created master flux for queue = '{}'", eventQueue.getName());
    return masterFlux
            .log("Publishing flux")
            .publish()
            .autoConnect()
            .timeout(Duration.ofMillis(10000))
            .doOnError(error -> log.error("Unhandled exception in masterFlux(): {}", error.getMessage(), error))
            .log("Auto connection successful");
}
Run Code Online (Sandbox Code Playgroud)

此异常感觉像是配置错误,但我无法缩小导致此超时的原因以确认这一点。任何帮助,将不胜感激!

小智 0

参考: https: //www.codota.com/code/java/methods/reactor.core.publisher.Flux/timeout 我怀疑问题是在创建 masterFlux 的过程中没有指定超时。

参考:https ://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

create() 有一个具有以下定义的静态方法: create(Consumer<? super FluxSink>emitter, FluxSink.OverflowStrategy backPressure)

所以,我猜测,看到 OverflowStrategy 尚未定义,如果消息传输失败,代码不知道该怎么做并抛出此异常。有关 OverflowStrategy 的更多信息,请访问: https ://projectreactor.io/docs/core/release/api/reactor/core/publisher/FluxSink.OverflowStrategy.html