spring-cloud-stream 消息转换异常

Yos*_*sha 5 spring-cloud-stream

将我们的一项服务升级到 2.0.0.RC3 时,我们在尝试使用由使用旧版本Ditmars.RELEASEspring-cloud-stream的服务生成的消息时遇到了异常:spring-cloud-stream

错误31241 --- [container-4-C-1] osintegration.handler.LoggingHandler:org.springframework.messaging.converter.MessageConversionException:无法从[[B]转换为[com.watercorp.messaging.types.incoming.UsersDeletedMessage ] 对于 GenericMessage [payload=byte[371], headers={kafka_offset=1,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d,kafka_timestampType=CREATE_TIME,message_id=1645508761,id=f4e947de-22e6-b629- 229b-4fa961c73f2d,类型=USERS_DELETED,kafka_receivedPartitionId=4,contentType=text/plain,kafka_receivedTopic=用户,kafka_receivedTimestamp=1521641760698,时间戳=1521641772477}],failedMessage=GenericMessage [有效负载=字节[371],标头s={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d,kafka_timestampType=CREATE_TIME,message_id=1645508761,id=f4e947de-22e6-b629-229b-4fa961c73f2d,类型=USERS_DELETED,kafka_receivedPartition Id=4,contentType=文本/纯文本, kafka_receivedTopic =用户,kafka_receivedTimestamp = 1521641760698,时间戳= 1521641772477}]在org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)在org.springframework.messaging.handler.in vocal.HandlerMethodArgumentResolverComposite.resolveArgument (HandlerMethodArgumentResolverComposite.java:116) 在 org.springframework.messaging.handler.invocable.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) 在 org.springframework.messaging.handler.inspiration.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在org.springframework.integration.handler.AbstractMessageHandler。 handleMessage(AbstractMessageHandler.java:164)在org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在org .springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)在org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher) .java:132)在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)在org.springframework.integration。通道.AbstractMessageChannel。发送(AbstractMessageChannel.java:463)在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)在org.springframework .messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java :108)在org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70)在org.springframework.integration .kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) 在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java) :932)在org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)在org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.run(KafkaMessageListenerContainer.java:689)在java.util。并发.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.java:745)70)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)在org .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java :689) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.爪哇:745)70)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)在org .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java :689) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.爪哇:745)

看来原因是contentType与消息一起发送的标头text/plain虽然应该是application/json
生产者配置:

春天:
  云:
      溪流:
        卡夫卡:
          活页夹:
            经纪人:卡夫卡
            默认代理端口:9092
            zkNodes:动物园管理员
            默认Zk端口:2181
            最小分区数:2
            复制因子:1
            自动创建主题:true
            自动添加分区:true
            标头:类型、message_id
            所需确认:1
            配置:
              "[security.protocol]": PLAINTEXT #TODO:这是一种解决方法。应该是 security.protocol
          绑定:
            用户输出:
              制片人:
                同步:真
                配置:
                  重试次数:10000
        默认:
          活页夹:卡夫卡
          内容类型:应用程序/json
          组:用户服务
          消费者:
            最大尝试次数:1
          制片人:
            partitionKeyExtractorClass:com.watercorp.user_service.messaging.PartitionKeyExtractor
        绑定:
          用户输出:
            目的地:用户
            制片人:
              分区数:5
消费者配置:

春天:
  云:
      溪流:
        卡夫卡:
          活页夹:
            经纪人:卡夫卡
            默认代理端口:9092
            最小分区数:2
            复制因子:1
            自动创建主题:true
            自动添加分区:true
            标头:类型、message_id
            所需确认:1
            配置:
              "[security.protocol]": PLAINTEXT #TODO:这是一种解决方法。应该是 security.protocol
          绑定:
            用户输入:
              消费者:
                自动重新平衡启用:true
                自动提交错误:true
                启用Dlq:真            
        默认:
          活页夹:卡夫卡
          内容类型:应用程序/json
          组: 注册服务
          消费者:
            最大尝试次数:1
            headerMode: 嵌入标题
          制片人:
            partitionKeyExtractorClass:com.watercorp.messaging.PartitionKeyExtractor
            headerMode: 嵌入标题
        绑定:          
          用户输入:
            目的地:用户
            消费者:
              并发数:5
              分区:true          

消费者@StreamListener:

    @StreamListener(目标 = UserInput.INPUT, 条件 = "headers['type']=='" + USERS_DELETED + "'")
    公共无效handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage,@Header(值=“kafka_receivedPartitionId”,
            必需= false)字符串partitionId,@Header(值= KAFKA_TOPIC_HEADER_NAME,必需= false)字符串主题,@Header(MESSAGE_ID_HEADER_NAME)字符串messageId)抛出Throwable {
        logger.info(String.format("收到用户删除的消息message,消息id:%s主题:%s分区:%s",messageId,topic,partitionId));
        handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic);
    }

Gar*_*ell 4

这是 RC3 中的一个错误;最近固定在 master 上;它将在月底发布的 GA 版本中发布。同时,您可以尝试使用 2.0.0.BUILD-SNAPSHOT 吗?

我能够重现该问题并使用快照为我修复了它......

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

编辑

为了完整性:

迪特马斯 制片人

@SpringBootApplication
@EnableBinding(Source.class)
public class So49409104Application {

    public static void main(String[] args) {
        SpringApplication.run(So49409104Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            Foo foo = new Foo();
            foo.setBar("bar");
            output.send(new GenericMessage<>(foo));
        };
    }


    public static class Foo {

        private String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
Run Code Online (Sandbox Code Playgroud)

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: so49409104a
          content-type: application/json
          producer:
            header-mode: embeddedHeaders
Run Code Online (Sandbox Code Playgroud)

艾姆赫斯特消费者:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So494091041Application {

    public static void main(String[] args) {
        SpringApplication.run(So494091041Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(Foo foo) {
        System.out.println(foo);
    }

    public static class Foo {

        private String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
Run Code Online (Sandbox Code Playgroud)

spring:
  cloud:
    stream:
      bindings:
        input:
          group: so49409104
          destination: so49409104a
          consumer:
            header-mode: embeddedHeaders
          content-type: application/json
Run Code Online (Sandbox Code Playgroud)

结果:

Foo [bar=bar]
Run Code Online (Sandbox Code Playgroud)

header-mode是必需的,因为 2.0 中默认nativeKafka 本身支持标头。