Spring Cloud Stream Reactive - Handling Messages exception

Tom*_*her 4 reactor spring-rabbit kotlin spring-cloud-stream

I'm currently working on a project in Kotlin that uses rabbit with reactor to receive messages of some DTO type, and dispatch them if they are up to certain criteria. In the process of testing my code, I tried to simulate bad Message input(Since the message is coming from an external service) and see the behaviour of the subscriber. Once I got a bad input message, the subscriber stopped listening to any new input and threw the following exception:

 org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage
Run Code Online (Sandbox Code Playgroud)

I then tried to run the official example from spring and change the Supplier to send bad data on the first dispatch, then send valid data and see the behaviour.

On the Supplier's side, I have added an indexer to send a bad message only on the first run.

//Following source and sinks are used for testing only.
//Test source will send data to the same destination where the processor receives data
//Test sink will consume data from the same destination where the processor produces data
// ------ New Code -------
static int x = 0;
// ------ END New Code -------

static class TestSource {

    private AtomicBoolean semaphore = new AtomicBoolean(true);
    private Random random = new Random();
    private int[] ids = new int[]{100100, 100200, 100300};

    @Bean
    public Supplier<?> sendTestData() {

        return () -> {
            // ------ New Code -------
            if(x==0) {
                return "hey";
            }
            x++;
            // ------ END New Code -------
            int id = ids[random.nextInt(3)];
            int temperature = random.nextInt((102 - 65) + 1) + 65;
            Sensor sensor = new Sensor();
            sensor.setId(id);
            sensor.setTemperature(temperature);
            return sensor;
        };
    }
}
Run Code Online (Sandbox Code Playgroud)

The subscriber side:

@Bean
public Function<Flux<Sensor>, Flux<Average>> calculateAverage() {
    return data -> data.window(Duration.ofSeconds(3)).flatMap(
            window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage));
}

private Mono<Average> calculateAverage(GroupedFlux<Integer, Sensor> group) {
    return group
            .reduce(new Accumulator(0, 0),
                    (a, d) -> new Accumulator(a.getCount() + 1, a.getTotalValue() + d.getTemperature()))
            .map(accumulator -> new Average(group.key(), (accumulator.getTotalValue()) / accumulator.getCount()));
}
Run Code Online (Sandbox Code Playgroud)

The results, as I suspected, showed that the subscriber fails to proceed with the next valid messages after failing on the bad input:

2021-02-21 17:46:57.905  INFO 30702 --- [lOCpiq_lPYTgA-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.calculateAverage-in-0' has 0 subscriber(s).
2021-02-21 17:46:57.907 ERROR 30702 --- [lOCpiq_lPYTgA-1] onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'calculateAverage'
2021-02-21 17:46:57.910 ERROR 30702 --- [lOCpiq_lPYTgA-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'hey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"hey"; line: 1, column: 4]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"hey"; line: 1, column: 4]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:110) ~[spring-cloud-stream-3.0.11.RELEASE.jar:3.0.11.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:197) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.fromMessage(SimpleFunctionRegistry.java:932) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputValueIfNecessary(SimpleFunctionRegistry.java:833) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertInputPublisherIfNecessary$9(SimpleFunctionRegistry.java:772) ~[spring-cloud-function-context-3.0.13.RELEASE.jar:3.0.13.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274) ~[reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.5.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.5.RELEASE.jar:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290) ~[spring-integration-amqp-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202) ~[spring-rabbit-2.2.11.RELEASE.jar:2.2.11.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-02-21 17:47:00.917 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}], failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:65)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:294)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:290)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=4, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:57 IST 2021, amqp_messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, id=6493e09a-4b4d-37f9-615d-24faa3f3b2f2, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:57 IST 2021, messageId=c6430a53-a916-524f-f436-9fa34f1ba4f9, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=4, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922417903}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 23 more

2021-02-21 17:47:03.507  INFO 30702 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-02-21 17:47:03.936 ERROR 30702 --- [lOCpiq_lPYTgA-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.calculateAverage-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}], failedMessage=GenericMessage [payload=byte[3], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=sensor, amqp_deliveryTag=5, deliveryAttempt=3, amqp_consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA, amqp_redelivered=false, amqp_receivedRoutingKey=sensor, amqp_timestamp=Sun Feb 21 17:46:58 IST 2021, amqp_messageId=b67652f5-1842-4c13-596d-295b36002217, id=4f2a90f9-5aae-0fae-507d-787694f605dc, amqp_consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, sourceData=(Body:'hey' MessageProperties [headers={}, timestamp=Sun Feb 21 17:46:58 IST 2021, messageId=b67652f5-1842-4c13-596d-295b36002217, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sensor, receivedRoutingKey=sensor, deliveryTag=5, consumerTag=amq.ctag-LqdVMYDHHTDatSifcDm9bg, consumerQueue=sensor.anonymous.Q3w6aZhbSlOCpiq_lPYTgA]), contentType=application/json, timestamp=1613922420930}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.j

Ole*_*sky 7

考虑到反应式编程的性质,这是一个棘手的问题,因此我们可能需要将此讨论纳入一个问题,因此请随意提出,但这是我的看法。

反应式函数和命令式函数之间的根本区别在于工作单元的概念。对于命令式函数,工作单元是单个消息,因此框架保持对流的持续控制,仅通过消息将其瞬间传递给函数。因此,您会理所当然地期望,无论错误发生在哪里,我们都会有一些错误处理方法 - 我们确实这样做了。

使用响应式函数,世界会完全改变,因为工作单元是整个流,而函数仅充当框架提供的流和用户定义的流操作之间的连接器。此时,sc-stream 无法控制用户的操作,因此我们的一般建议是让用户自行处理,特别是考虑到反应式 API 的丰富性,当涉及到错误处理时。但要明白,这并不是因为我们不想,而是我们不能,因为我们当时对流没有任何看法。您的问题确实相当独特,因为异常发生在您定义的步骤执行之前,特别是我们提供的类型转换。事实上,我们可以做一些事情来帮助解决这个问题,但我们仍在寻求关于这些事情应该是什么的共识,直到我们做到快速失败才是解决方案。您可以通过修复输入来克服它,因为它显然不是 JSON 和/或放宽您的函数签名Function<Flux<byte[]>, Flux<Average>>并自行处理类型转换。

不管怎样,正如你所看到的,我愿意接受建议,所以请随意提出、发布并提供你的意见。