我们有一个流程,希望使用 Spring Boot 2 WebFlux 通过反应式编程来实现。目前我们没有反应式编程的经验。
作为此流程的一部分,我们将创建一个或多个 HTTP 请求(我猜使用 WebClient),并从数据库读取一些数据。
我们正在考虑使用 AWS DynamoDB,但据我了解 Java SDK 不支持反应式 API。
此读取将是一个阻塞 I/O 操作,我的问题是使用 WebFlux 实现此流程的一部分是否有好处?更一般地说,流程中的单个阻塞 I/O 操作是否会消除我们通过响应式编程实现所获得的所有好处?
java nonblocking reactive-programming spring-boot spring-webflux
将我们的一项服务升级到 2.0.0.RC3 时,我们在尝试使用由使用旧版本Ditmars.RELEASEspring-cloud-stream的服务生成的消息时遇到了异常:spring-cloud-stream
错误31241 --- [container-4-C-1] osintegration.handler.LoggingHandler:org.springframework.messaging.converter.MessageConversionException:无法从[[B]转换为[com.watercorp.messaging.types.incoming.UsersDeletedMessage ] 对于 GenericMessage [payload=byte[371], headers={kafka_offset=1,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d,kafka_timestampType=CREATE_TIME,message_id=1645508761,id=f4e947de-22e6-b629- 229b-4fa961c73f2d,类型=USERS_DELETED,kafka_receivedPartitionId=4,contentType=text/plain,kafka_receivedTopic=用户,kafka_receivedTimestamp=1521641760698,时间戳=1521641772477}],failedMessage=GenericMessage [有效负载=字节[371],标头s={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d,kafka_timestampType=CREATE_TIME,message_id=1645508761,id=f4e947de-22e6-b629-229b-4fa961c73f2d,类型=USERS_DELETED,kafka_receivedPartition Id=4,contentType=文本/纯文本, kafka_receivedTopic =用户,kafka_receivedTimestamp = 1521641760698,时间戳= 1521641772477}]在org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)在org.springframework.messaging.handler.in vocal.HandlerMethodArgumentResolverComposite.resolveArgument (HandlerMethodArgumentResolverComposite.java:116) 在 org.springframework.messaging.handler.invocable.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) 在 org.springframework.messaging.handler.inspiration.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在org.springframework.integration.handler.AbstractMessageHandler。 handleMessage(AbstractMessageHandler.java:164)在org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在org .springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)在org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher) .java:132)在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)在org.springframework.integration。通道.AbstractMessageChannel。发送(AbstractMessageChannel.java:463)在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)在org.springframework .messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java :108)在org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70)在org.springframework.integration .kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) 在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java) :932)在org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)在org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.run(KafkaMessageListenerContainer.java:689)在java.util。并发.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.java:745)70)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)在org .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java :689) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.爪哇:745)70)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)在org .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) …