Spring-Kafka 无法将 AVRO GenericData.Record 转换为确认

Hua*_*Hua 3 json avro spring-boot kafka-consumer-api

使用 Spring Boot,我试图以批量接收模式设置我的 Kafka 消费者:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter()); // I know this one won't work
    factory.setBatchListener(true);
    return factory;
}

@Bean
public ConsumerFactory<GenericData.Record, GenericData.Record> consumerFactory() {
    Map<String, Object> dataRiverProps = getDataRiverProps();
    dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
    return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
Run Code Online (Sandbox Code Playgroud)

这就是实际消费者的样子:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = 'kafkaListenerContainerFactory')
public void consumeAvro(List<GenericData.Record> list, Acknowledgment ack) {
    messageProcessor.addMessageBatchToExecutor(list);
    while (messageProcessor.getTaskSize() > EXECUTOR_TASK_COUNT_THRESHOLD) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            LOGGER_ERROR.error(ExceptionUtils.getStackTrace(e.getCause()));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我得到的异常如下所示:

nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.apache.avro.generic.GenericData$Record] to type [org.springframework.kafka.support.Acknowledgment]
        at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:46)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:174)
        at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)
Run Code Online (Sandbox Code Playgroud)

Kafka 消息是 AVRO 消息,我想将它们检索为 JSON 字符串。是否有适用于 GenericData.Record 的现成 AVRO 转换器,我可以将其插入 ConcurrentKafkaListenerContainerFactory 中?谢谢!

sap*_*ati 12

只需将以下属性添加到您的 kafka 消费者配置中

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

  • 只是为了完成一点这个答案,如果您的配置在属性文件中,则等效的配置将是“spring.kafka.properties.specific.avro.reader:true” (5认同)

Pra*_*r D 5

以下是如何以批处理模式消费消息的示例。

带有 avro 消息格式的批处理侦听器的示例 Kafka 演示

该应用程序有一个自定义消息转换器,可将 Avro 消息直接转换为 Pojo。它使用 classPath 中的 Schema 文件。模式文件以约定“topicName”.avsc 命名

public class AvroSchemaMessageConverter extends MessagingMessageConverter {

  private AvroMapper avroMapper;
  private SchemaRegistry schemaRegistry;
  private KafkaHeaderMapper headerMapper;


  public AvroSchemaMessageConverter(AvroMapper avroMapper, SchemaRegistry schemaRegistry) {
    this.avroMapper = avroMapper;
    this.schemaRegistry = schemaRegistry;
    if (JacksonPresent.isJackson2Present()) {
      this.headerMapper = new DefaultKafkaHeaderMapper();
    } else {
      this.headerMapper = new SimpleKafkaHeaderMapper();
    }
  }

  @Override
  protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
    System.out.printf(record.value().getClass().getName());
    ByteBuffer buffer = ByteBuffer.wrap((byte[])record.value());
    JavaType javaType = TypeFactory.defaultInstance().constructType(type);
    try {
      return avroMapper.readerFor(javaType).with(schemaRegistry.getAvroSchema(record.topic()))
        .readValue(buffer.array(), buffer.arrayOffset(), buffer.limit());
    } catch (IOException e) {
      throw new ConversionException("Failed to convert AvroMessage", e);
    }
  }

  @Override
  public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
    MessageHeaders headers = message.getHeaders();
    Object topicHeader = headers.get(KafkaHeaders.TOPIC);
    String topic = null;
    if (topicHeader instanceof byte[]) {
      topic = new String(((byte[]) topicHeader), StandardCharsets.UTF_8);
    } else if (topicHeader instanceof String) {
      topic = (String) topicHeader;
    } else if (topicHeader == null) {
      Assert.state(defaultTopic != null, "With no topic header, a defaultTopic is required");
    } else {
      throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
        + topicHeader.getClass());
    }
    String actualTopic = topic == null ? defaultTopic : topic;
    Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
    Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
    Object payload = convertPayload(message, topic);
    Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);
    Headers recordHeaders = initialRecordHeaders(message);
    if (this.headerMapper != null) {
      this.headerMapper.fromHeaders(headers, recordHeaders);
    }
    return new ProducerRecord(topic == null ? defaultTopic : topic, partition, timestamp, key,
      payload,
      recordHeaders);
  }

  protected Object convertPayload(Message<?> message, String topic) {
    try {
      return avroMapper.writer(schemaRegistry.getAvroSchema(topic))
        .writeValueAsBytes(message.getPayload());
    } catch (JsonProcessingException e) {
      throw new ConversionException("Failed to convert object to AvroMessage", e);
    }
  }
Run Code Online (Sandbox Code Playgroud)

以下是我们需要如何配置 ConsumerFactory 和 KafkaListenerContainerFactory:

@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
  kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true); // This is needed for batch listener
    factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    return factory;

  }

  @Bean
  public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
    KafkaTemplate kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
    kafkaTemplate.setMessageConverter(converter());
    return kafkaTemplate;
  }

  @Bean
  public RecordMessageConverter converter() {
    return new AvroSchemaMessageConverter(avroMapper(), schemaRegistry());
  }

  @Bean
  public SchemaRegistry schemaRegistry() {
    return new SchemaRegistry();
  }

  @Bean
  public AvroMapper avroMapper() {
    AvroMapper mapper = new AvroMapper();
    mapper.configure(Feature.IGNORE_UNKNOWN, true);
    mapper.setSerializationInclusion(Include.NON_NULL);
    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    return mapper;
  }

}
Run Code Online (Sandbox Code Playgroud)