我正在尝试用Kafka 0.8.1建立一个POC.我使用自己的java类作为Kafka消息,它有一堆String数据类型.我不能使用默认的序列化程序类或Kafka库附带的String serializer类.我想我需要编写自己的序列化程序并将其提供给生产者属性.如果您知道在Kafka中编写示例自定义序列化程序(在java中),请分享.非常感谢,非常感谢.
我想知道我们可以在 Kafka 主题中拥有哪些类型的数据。正如我在应用程序级别所知道的,这是一个键值对,这可能是语言支持的类型数据。例如,我们向主题发送一些消息,可以是一些 json、parquet 文件、序列化数据,还是我们只像使用纯文本格式一样处理消息?
谢谢你的帮助。
我是 kafka 甚至序列化的新手。到目前为止,我需要处理使用简单代码序列化的 json 格式的 kafka 事件。但现在正在使用 Avro 编码器添加额外的事件。所以现在我希望这个单一的消费者在 json 中使用 StringDeserialzer,而对于 Avro 其各自的解串器。但是如何在同一个属性文件中映射 2 个反序列化器?
private Properties getProps(){
Properties props = new Properties();
props.put("group.id", env.getProperty("group.id"));
props.put("enable.auto.commit", env.getProperty("enable.auto.commit"));
props.put("key.deserializer", env.getProperty("key.deserializer"));
props.put("value.deserializer", env.getProperty("value.deserializer"));
return props;
}//here as only value can be mapped to "key.deserializer" is there anyway to do this
Run Code Online (Sandbox Code Playgroud)
在主方法中
KafkaConsumer<String, String> _consumer = new KafkaConsumer<>(getProps());
consumers.add(_consumer);
_consumer.subscribe(new ArrayList<>(topicConsumer.keySet()));
Run Code Online (Sandbox Code Playgroud) 我通过使用 Spring Boot 应用程序在 Kafka Producer 中将其转换为 toString() 来发送一个 JSON 数组,但我在 Consumer 中遇到以下错误:
org.springframework.kafka.listener.ListenerExecutionFailedException:无法使用传入消息调用侦听器方法 端点处理程序详细信息:方法 [public void com.springboot.service.KafkaReciever.recieveData(com.springboot.model.Student,java.lang.String) throws java.lang.Exception] Bean [com.springboot.service.KafkaReciever@5bb3d42d ]; 嵌套异常是 org.springframework.messaging.converter.MessageConversionException:无法处理消息;嵌套异常是 org.springframework.messaging.converter.MessageConversionException:无法从 [java.lang.String] 转换为 [com.springboot.model.Student] for GenericMessage [payload=[com.springboot.model.Student@5e40dc31, com] .springboot.model.Student@235e68b8], headers={kafka_offset=45, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}], failedMessage=GenericMessage.Student@3dcmodel=GenericMessage. com。
配置文件:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Value("${kafka.consumer.group.id}")
private String kafkaGroupId;
@Bean
public ConsumerFactory<String, String> consumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put("message.assembler.buffer.capacity", 33554432);
props.put("max.tracked.messages.per.partition", 24);
props.put("exception.on.message.dropped", true);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); …
Run Code Online (Sandbox Code Playgroud)