我尝试在 GitHub ( https://github.com/onurtokat/kafka-clickstream-enrich )上模拟 Gwen (Chen) Shapira 的 kafka-clickstream-enrich kafka-stream 项目。当我使用反序列化器使用消费者类消费主题时,我遇到了错误。定制的 Serde 类有序列化器和反序列化器。但是,我试图理解为什么自定义 serde 用于反序列化器,然后消费者 API 给出错误,因为它不是 org.apache.kafka.common.serialization.Deserializer 的实例
可以使用带有 Serdes.Integer() Serializer 和 new ProfileSerde() Deserializer 的 KTable 来使用该主题,如下所示。
KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
Consumed.with(Serdes.Integer(), new ProfileSerde()),
Materialized.as("profile-store"));
Run Code Online (Sandbox Code Playgroud)
定制的 Serde 被定义为;
static public final class ProfileSerde extends WrapperSerde<UserProfile> {
public ProfileSerde() {
super(new JsonSerializer<UserProfile>(), new JsonDeserializer<UserProfile>(UserProfile.class));
}
}
Run Code Online (Sandbox Code Playgroud)
通用的 Serde 是定制的,如下所示;
package com.onurtokat.serde;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class WrapperSerde<T> implements Serde<T> {
final private …Run Code Online (Sandbox Code Playgroud)