从 Kafka 消费者反序列化 Java 对象

Mat*_*att 3 java kryo apache-kafka

我有一个 Kafka Consumer,当前配置为:

kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Run Code Online (Sandbox Code Playgroud)

但我真正想要的是能够使用 Kryo 反序列化器来代替:

public class KryoPOJODeserializer<T> implements Deserializer<T> {

    private Kryo kryo = new Kryo();

    @Override
    public void configure(Map props, boolean isKey) {
        kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
        kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        // Deserialize the serialized object.
        return kryo.readObject(new Input(data), T.class);
    }

    @Override
    public void close() {

    }

}
Run Code Online (Sandbox Code Playgroud)

我不明白的是,是否可以为不同的主题重用相同的 Consumer(每个主题都有不同类型的 POJO)?如果我的消费者配置是:

kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName());
Run Code Online (Sandbox Code Playgroud)

或者,我是否必须为每个主题都有一个单独的消费者?

或者,我是否必须删除反序列化器的泛型部分,始终返回一个对象,并将该对象转换为客户端代码中的相关 POJO?就像是:

public class KryoPOJODeserializer implements Deserializer {

    private Kryo kryo = new Kryo();

    @Override
    public void configure(Map props, boolean isKey) {
        kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
        kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        // Deserialize the serialized object.
        return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data)));
    }

    @Override
    public void close() {

    }

}
Run Code Online (Sandbox Code Playgroud)

后者可以工作,但感觉有点脏。

任何建议表示赞赏!

Jak*_*rab 6

您可以通过将Deserializer实例直接传递给 Consumer 来使用原始方法:

KafkaConsumer<String, Foo> consumer = new KafkaConsumer<>(properties,
    new StringDeserializer(), new KryoPOJODeserializer(Foo.class));
Run Code Online (Sandbox Code Playgroud)

如果您想为多个主题重用相同的传入数据类型,那么您可以使用单个使用者设置对这些主题的订阅。如果您想要不同的对象类型的值,那么您将需要使用多个使用者。

否则你的第二种方法也是有效的。

  • 是的,这是可以接受的 - 消费者(无论消息传递技术如何)通常被编写为处理一种特定的有效负载类型,尽管总是有例外。至于 KafkaConsumer 是否“重”,取决于你的意思。消费者将维护与它正在消费消息的所有代理的 TCP 连接。除此之外,没有线程池在后台执行操作。 (2认同)