spring kafka 标题中没有类型信息,也没有提供默认类型

mco*_*ner 7 apache-kafka spring-boot spring-kafka

我有一个 Spring Boot 应用程序,它定义了:

  • 写入 kafka 主题 STREAM_TOPIC_IN_QQQ 的 REST 控制器
  • 从 STREAM_TOPIC_IN_QQQ (groupId="bar") 和日志读取的 KafkaListener
  • 一个查看主题并记录它的 KStream,将其转换为另一种类型,然后将其写入 STREAM_TOPIC_OUT_QQQ
  • 另一个从 STREAM_TOPIC_OUT_QQQ 读取的 KafkaListener。

(我已经改变了后缀,以避免任何可能的混乱,手工创建的主题,否则我得到一个警告,STREAM_TOPIC_IN_ XXX = LEADER_NOT_AVAILABLE和流不会为一分钟左右运行。)

第一个侦听器和流似乎正在工作,但是当 STREAM_OUT_TOPIC 上的侦听器尝试反序列化消息时,我收到以下异常。我正在使用 Produced.with 提供流中的 serde。我需要做什么才能让侦听器知道要反序列化的类型?

日志

11 Mar 2019 14:34:00,194   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
11 Mar 2019 14:34:00,236   INFO     [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer]   Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367   ERROR    [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
Run Code Online (Sandbox Code Playgroud)

这是配置:

REST(弹簧 mvc):

@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
    Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
    this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
    logger.debug("Sending a Kafka Message");
    return gr;
}
Run Code Online (Sandbox Code Playgroud)

卡夫卡配置(弹簧卡夫卡):

@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
    KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
    stream.peek((k, greeting) -> {
        logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
    })
          .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
          .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
    return stream;
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
    logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}
Run Code Online (Sandbox Code Playgroud)

应用程序.yml

spring:
kafka:
  bootstrap-servers:  localhost:9092
  consumer:
    group-id: foo
    auto-offset-reset: latest
    key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    properties:
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    application-id: kafka9000-v0.1
    properties: # properties not explicitly handled by KafkaProperties.streams
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
Run Code Online (Sandbox Code Playgroud)

Amr*_*bhu 14

所以我也面临着同样的问题。

我这样修好了

您必须将以下属性设置为您尝试反序列化的类

spring.json.value.default.type=com.something.model.TransactionEventPayload
Run Code Online (Sandbox Code Playgroud)

我将 KafkaListener 的属性设置为:

@KafkaListener(topics = "topic", groupId = "myGroupId", properties = {"spring.json.value.default.type=com.something.model.TransactionEventPayload"})
 public void consumeTransactionEvent(@Payload TransactionEventPayload payload,
                       @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {

Run Code Online (Sandbox Code Playgroud)


小智 12

此异常由 引发org.springframework.kafka.support.serializer.JsonDeserializer,这要求将类型信息包含在特殊类型标头中,或通过@KafkaListener提供spring.json.value.default.type configuration property
这就是我在 SpringBoot 2.5.3 中解决这个问题的方法:

  1. 添加ByteArrayJsonMessageConverter到上下文:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter;
import org.springframework.kafka.support.converter.JsonMessageConverter;

@Configuration
public class JsonMessageConverterConfig {
    @Bean
    public JsonMessageConverter jsonMessageConverter() {
        return new ByteArrayJsonMessageConverter();
    }
}
Run Code Online (Sandbox Code Playgroud)
  1. 设置app.kafka.producer.value-serializerapp.kafka.consumer.value-deserializer
app.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
app.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
Run Code Online (Sandbox Code Playgroud)
  1. 现在您可以禁用序列化TypeId header
spring.kafka.producer.properties.spring.json.add.type.headers=false
Run Code Online (Sandbox Code Playgroud)


mco*_*ner 9

“回答”我自己的问题主要是为了整合@GaryRussell 的评论中的信息,但基本上,他提供了最佳答案。简而言之,我做了以下事情:

  • 将消费者解串器设置为 StringDeserializer
  • 添加一个 messageConverter bean 作为 StringJsonMessageConverter
  • 在 KafkaListener 注释方法中,只需使用预期类型的​​ Payload
  • 如果在 KafaListener 注释方法中使用 ConsumerRecord,不要期望它是 Payload 类型。现在它将是字符串(因为消息转换器,而不是解串器正在执行此操作)。

另一件事:默认情况下,在使用spring boot autoconfigure时,只需添加messageConverter也会将其添加到自动配置的kafkaTemplate中。这在调用时似乎不是问题kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", greeting),但我认为如果使用 send(Message) 可能是问题。

以下是一个工作配置,因为我以最少的配置按预期获得消息

应用程序.yml:

  spring:
    kafka:
      bootstrap-servers:  localhost:9092
      consumer:
        group-id: foo
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      streams:
        application-id: kafka9000-v0.1
        properties: # properties not explicitly handled by KafkaProperties.streams
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model
Run Code Online (Sandbox Code Playgroud)

卡夫卡配置:

        @Bean RecordMessageConverter messageConverter() {  return new StringJsonMessageConverter();  }

...
    @Bean
    public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
        KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
        stream.peek((k, greeting) -> {
            logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
        })
              .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
              .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
        return stream;
    }

    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
    public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
    //    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
        logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
    }

    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
    public void listenForGreetingResponses(@Payload Greeting gr,
            ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
        //logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
        logger.info("STREAM_IN_TOPIC Listener:   Greeting: {}", gr.getContent());
        logger.info("STREAM_IN_TOPIC Listener:  From Headers: topic: {}, partition: {}, key: {}", topic, partition,
                    key);
        logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
                    record.topic(), record.partition(), record.key());
        logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
    }

    @Bean
    public KafkaListenerErrorHandler myTopicErrorHandler() {
        return (m, e) -> {
            logger.error("Got an error {}", e.getMessage());
            return "some info about the failure";
        };
    }
Run Code Online (Sandbox Code Playgroud)

消息的输出是:

13 Mar 2019 09:56:57,884   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
13 Mar 2019 09:56:57,913   INFO     [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer]   Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Greeting: Hello, World!
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:  From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
Run Code Online (Sandbox Code Playgroud)


Gar*_*ell 8

请参阅文档

具体来说...

JsonDeserializer.VALUE_DEFAULT_TYPE:如果不存在标头信息,则用于反序列化值的回退类型。

它是 spring.json.value.default.type

您还可以设置spring.json.use.type.headers(默认为 true)以防止甚至查找标题。

反序列化器会自动信任默认类型的包,因此没有必要将其添加到那里。

编辑

但是,另请参阅Spring Messaging Message Conversion

使用BytesDeserializerand BytesJsonMessageConverter,框架将传递方法参数类型作为转换目标。

  • 在我看来,“BytesDeserializer”、“BytesJsonMessageConverter”解决方案是大多数人想要使用的解决方案,因为使用标头来决定类型仅对 spring 或至少对 java 生产者有用。 (3认同)
  • 啊 - 好的 - 使用带有 `StringJsonMessageConverter`(或 `BytesJsonMessageConverter`)的 `StringDeserializer`(或 `BytesDeserializer`)代替 `JsonDeserializer`(`Bytes...` 版本效率更高。然后,框架会告诉你转换器在侦听器方法中找到的类型。再次,请参阅[文档](https://docs.spring.io/spring-kafka/reference/html/#spring-messaging-message-conversion), (2认同)
  • 如果引导自动配置找到转换器“@Bean”,它会自动将其连接到容器工厂。 (2认同)