Onu*_*kat 2 java apache-kafka kafka-consumer-api apache-kafka-streams
我尝试在 GitHub ( https://github.com/onurtokat/kafka-clickstream-enrich )上模拟 Gwen (Chen) Shapira 的 kafka-clickstream-enrich kafka-stream 项目。当我使用反序列化器使用消费者类消费主题时,我遇到了错误。定制的 Serde 类有序列化器和反序列化器。但是,我试图理解为什么自定义 serde 用于反序列化器,然后消费者 API 给出错误,因为它不是 org.apache.kafka.common.serialization.Deserializer 的实例
可以使用带有 Serdes.Integer() Serializer 和 new ProfileSerde() Deserializer 的 KTable 来使用该主题,如下所示。
KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
Consumed.with(Serdes.Integer(), new ProfileSerde()),
Materialized.as("profile-store"));
Run Code Online (Sandbox Code Playgroud)
定制的 Serde 被定义为;
static public final class ProfileSerde extends WrapperSerde<UserProfile> {
public ProfileSerde() {
super(new JsonSerializer<UserProfile>(), new JsonDeserializer<UserProfile>(UserProfile.class));
}
}
Run Code Online (Sandbox Code Playgroud)
通用的 Serde 是定制的,如下所示;
package com.onurtokat.serde;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class WrapperSerde<T> implements Serde<T> {
final private Serializer<T> serializer;
final private Deserializer<T> deserializer;
public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<T> serializer() {
return serializer;
}
@Override
public Deserializer<T> deserializer() {
return deserializer;
}
}
Run Code Online (Sandbox Code Playgroud)
我的消费者非常简单,可以在下面看到;
package com.onurtokat.consumers;
import com.onurtokat.ClickstreamEnrichment;
import com.onurtokat.Constants;
import com.onurtokat.model.UserProfile;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumeProfileData {
public static void main(String[] args) {
//prepare config
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClickstreamEnrichment.ProfileSerde.class);
KafkaConsumer<Integer, UserProfile> consumerProfileTopic = new KafkaConsumer<>(config);
consumerProfileTopic.subscribe(Arrays.asList(Constants.USER_PROFILE_TOPIC));
while (true) {
ConsumerRecords<Integer, UserProfile> records = consumerProfileTopic.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, UserProfile> record : records) {
System.out.println(record.key() + " " + record.value());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
当我尝试与我的消费者一起消费主题时的错误是;
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
at com.onurtokat.consumers.ConsumeProfileData.main(ConsumeProfileData.java:25)
Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
... 3 more
Run Code Online (Sandbox Code Playgroud)
Mic*_*oll 10
区别在于:
Serde<T>
有 aSerializer<T>
和 a Deserializer<T>
。您发布的第一个代码片段(例如 a KTable
)是 Kafka Streams 代码片段,这就是为什么它需要Serde
. Kafka Streams 需要 a ,Serde
因为它既生成消息(需要 a Serializer
)又读取消息(需要 a Deserializer
)。KafkaConsumer
)正在使用消费者客户端,因此需要 aDeserializer
而不是 a Serde
。关于:
Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
... 3 more
Run Code Online (Sandbox Code Playgroud)
您的 Kafka 消费者客户端代码被赋予了Serde
它期望的Deserializer
.
归档时间: |
|
查看次数: |
1766 次 |
最近记录: |