相关疑难解决方法(0)

Kafka:编写自定义序列化程序

我正在尝试用Kafka 0.8.1建立一个POC.我使用自己的java类作为Kafka消息,它有一堆String数据类型.我不能使用默认的序列化程序类或Kafka库附带的String serializer类.我想我需要编写自己的序列化程序并将其提供给生产者属性.如果您知道在Kafka中编写示例自定义序列化程序(在java中),请分享.非常感谢,非常感谢.

java customization serialization json apache-kafka

20
推荐指数
2
解决办法
2万
查看次数

Kafka 消息的数据类型

我想知道我们可以在 Kafka 主题中拥有哪些类型的数据。正如我在应用程序级别所知道的,这是一个键值对,这可能是语言支持的类型数据。例如,我们向主题发送一些消息,可以是一些 json、parquet 文件、序列化数据,还是我们只像使用纯文本格式一样处理消息?

谢谢你的帮助。

file-format apache-kafka kafka-producer-api

11
推荐指数
1
解决办法
2万
查看次数

为 kafka 消费者使用多个反序列化器

我是 kafka 甚至序列化的新手。到目前为止,我需要处理使用简单代码序列化的 json 格式的 kafka 事件。但现在正在使用 Avro 编码器添加额外的事件。所以现在我希望这个单一的消费者在 json 中使用 StringDeserialzer,而对于 Avro 其各自的解串器。但是如何在同一个属性文件中映射 2 个反序列化器?

private Properties getProps(){
    Properties props = new Properties();
    props.put("group.id", env.getProperty("group.id"));
    props.put("enable.auto.commit", env.getProperty("enable.auto.commit"));
    props.put("key.deserializer", env.getProperty("key.deserializer"));
    props.put("value.deserializer", env.getProperty("value.deserializer"));
    return props;
}//here as only value can be mapped to "key.deserializer" is there anyway to do this
Run Code Online (Sandbox Code Playgroud)

在主方法中

KafkaConsumer<String, String> _consumer = new KafkaConsumer<>(getProps());
consumers.add(_consumer);
_consumer.subscribe(new ArrayList<>(topicConsumer.keySet()));
Run Code Online (Sandbox Code Playgroud)

java serialization properties avro kafka-consumer-api

3
推荐指数
2
解决办法
2083
查看次数

无法使用传入消息调用 Kafka 侦听器方法

我通过使用 Spring Boot 应用程序在 Kafka Producer 中将其转换为 toString() 来发送一个 JSON 数组,但我在 Consumer 中遇到以下错误:

org.springframework.kafka.listener.ListenerExecutionFailedException:无法使用传入消息调用侦听器方法 端点处理程序详细信息:方法 [public void com.springboot.service.KafkaReciever.recieveData(com.springboot.model.Student,java.lang.String) throws java.lang.Exception] Bean [com.springboot.service.KafkaReciever@5bb3d42d ]; 嵌套异常是 org.springframework.messaging.converter.MessageConversionException:无法处理消息;嵌套异常是 org.springframework.messaging.converter.MessageConversionException:无法从 [java.lang.String] 转换为 [com.springboot.model.Student] for GenericMessage [payload=[com.springboot.model.Student@5e40dc31, com] .springboot.model.Student@235e68b8], headers={kafka_offset=45, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}], failedMessage=GenericMessage.Student@3dcmodel=GenericMessage. com。

配置文件:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;

    @Value("${kafka.consumer.group.id}")
    private String kafkaGroupId;

    @Bean
    public ConsumerFactory<String, String> consumerConfig() {

         Properties props = new Properties();

         props.put("bootstrap.servers", "localhost:9092");
         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
         props.put("message.assembler.buffer.capacity", 33554432);
         props.put("max.tracked.messages.per.partition", 24);
         props.put("exception.on.message.dropped", true);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-boot kafka-consumer-api

2
推荐指数
1
解决办法
7408
查看次数