小编Onu*_*kat的帖子

在 Kafka Consumer API 中实现 Deserializer 和 Serde 有什么区别?

我尝试在 GitHub ( https://github.com/onurtokat/kafka-clickstream-enrich )上模拟 Gwen (Chen) Shapira 的 kafka-clickstream-enrich kafka-stream 项目。当我使用反序列化器使用消费者类消费主题时,我遇到了错误。定制的 Serde 类有序列化器和反序列化器。但是,我试图理解为什么自定义 serde 用于反序列化器,然后消费者 API 给出错误,因为它不是 org.apache.kafka.common.serialization.Deserializer 的实例

可以使用带有 Serdes.Integer() Serializer 和 new ProfileSerde() Deserializer 的 KTable 来使用该主题,如下所示。

KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
                Consumed.with(Serdes.Integer(), new ProfileSerde()),
                Materialized.as("profile-store"));
Run Code Online (Sandbox Code Playgroud)

定制的 Serde 被定义为;

static public final class ProfileSerde extends WrapperSerde<UserProfile> {
        public ProfileSerde() {
            super(new JsonSerializer<UserProfile>(), new JsonDeserializer<UserProfile>(UserProfile.class));
        }
    }
Run Code Online (Sandbox Code Playgroud)

通用的 Serde 是定制的,如下所示;

package com.onurtokat.serde;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;


public class WrapperSerde<T> implements Serde<T> {

    final private …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api apache-kafka-streams

2
推荐指数
1
解决办法
1766
查看次数