Kafka-在Consumer中反序列化对象

Thi*_*iru 1 java apache-kafka spring-kafka

我们正在考虑在我们的消息传递中使用Kafka,并且我们的应用程序是使用Spring开发的。因此,我们计划使用spring-kafka。

生产者将消息作为HashMap对象放入队列。我们有JSON序列化程序,并且我们假设地图将被序列化并放入队列中。这是生产者配置。

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
        key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)

另一方面,我们有一个侦听器,用于侦听生产者发布消息的相同主题。这是使用者配置:

spring:
   kafka:
       consumer:
            group-id: xyz
            key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Run Code Online (Sandbox Code Playgroud)

我们的监听器方法:

  public void listener(SomeClass abx)
Run Code Online (Sandbox Code Playgroud)

我们预期json将被反序列化,并且将生成“ SomeClass”类型的对象。但是显然,它引发了反序列化异常。

我们看到的文章很少,建议这样做是:

 @Bean
  public ConsumerFactory<String, Car> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonDeserializer<>(Car.class));
  }
Run Code Online (Sandbox Code Playgroud)

我们不想编写一些代码来创建反序列化器。有没有我们所缺少的样板东西?任何帮助将不胜感激!!

Gar*_*ell 5

请参阅引导文档。特别是:

您还可以如下配置Spring Kafka JsonDeserializer:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

  • 有几种选择。您可以覆盖侦听器级别的属性 - 请参阅 https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties - 只需省略 `spring.kafka.consumer.properties`部分。或者您可以添加一个函数 - 请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-type-methods 或者,使用消息转换器,框架将检测类型从侦听器方法签名 - 请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#messaging-message-conversion (2认同)