为 kafka 消费者使用多个反序列化器

Jay*_*Doc 3 java serialization properties avro kafka-consumer-api

我是 kafka 甚至序列化的新手。到目前为止,我需要处理使用简单代码序列化的 json 格式的 kafka 事件。但现在正在使用 Avro 编码器添加额外的事件。所以现在我希望这个单一的消费者在 json 中使用 StringDeserialzer,而对于 Avro 其各自的解串器。但是如何在同一个属性文件中映射 2 个反序列化器?

private Properties getProps(){
    Properties props = new Properties();
    props.put("group.id", env.getProperty("group.id"));
    props.put("enable.auto.commit", env.getProperty("enable.auto.commit"));
    props.put("key.deserializer", env.getProperty("key.deserializer"));
    props.put("value.deserializer", env.getProperty("value.deserializer"));
    return props;
}//here as only value can be mapped to "key.deserializer" is there anyway to do this
Run Code Online (Sandbox Code Playgroud)

在主方法中

KafkaConsumer<String, String> _consumer = new KafkaConsumer<>(getProps());
consumers.add(_consumer);
_consumer.subscribe(new ArrayList<>(topicConsumer.keySet()));
Run Code Online (Sandbox Code Playgroud)

Slo*_*rsh 5

只需编写一个通用的反序列化器,将主题委托给匹配的反序列化器。

public class GenericDeserializer extends JsonDeserializer<Object>
{
    public GenericDeserializer()
    {
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data)
    {
        switch (topic)
        {
        case KafkaTopics.TOPIC_ONE:
            TopicOneDeserializer topicOneDeserializer = new TopicOneDeserializer();
            topicOneDeserializer.addTrustedPackages("com.xyz");
            return topicOneDeserializer.deserialize(topic, headers, data);
        case KafkaTopics.TOPIC_TWO:
            TopicTwoDeserializer topicTwoDeserializer= new TopicTwoDeserializer();
            topicTwoDeserializer.addTrustedPackages("com.xyz");
            return topicTwoDeserializer.deserialize(topic, headers, data);
        }
        return super.deserialize(topic, data);
    }
}
Run Code Online (Sandbox Code Playgroud)


Mat*_*Sax 3

您需要提供一个包含两个原始解串器的混合解串器。在内部,新的包装解串器必须能够区分两种类型的消息,并将原始字节转发到执行实际工作的正确解串器。

如果您无法提前知道您拥有什么类型的消息,您还可以尝试一种错误方法 - 即默认将其交给一个序列化器,如果失败(即抛出异常),请尝试第二个。