Bru*_*ipa 4 java apache-kafka apache-kafka-streams
我正在使用Kafka和流技术;我为KStream创建了一个自定义的序列化器和反序列化器,我将使用它们来接收给定主题的消息。
现在,问题是我正在以这种方式创建Serde:
JsonSerializer<EventMessage> serializer = new JsonSerializer<>();
JsonDeserializer<EventMessage> deserializer = new JsonDeserializer<>(EventMessage.class);
Serde<EventMessage> messageSerde = Serdes.serdeFrom(serializer, deserializer);
Run Code Online (Sandbox Code Playgroud)
序列化器实现:
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson = new Gson();
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, T data) {
return gson.toJson(data).getBytes(Charset.forName("UTF-8"));
}
@Override
public void close() {
}
}
Run Code Online (Sandbox Code Playgroud)
解串器实现:
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson = new Gson();
private Class<T> deserializedClass;
public JsonDeserializer() {
}
public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String topic, byte[] data) {
System.out.print(data);
if(data == null){
return null;
}
return gson.fromJson(new String(data),deserializedClass);
}
@Override
public void close() {
}
}
Run Code Online (Sandbox Code Playgroud)
当我尝试执行代码时,出现以下错误:
引起原因:org.apache.kafka.common.KafkaException:无法实例化类org.apache.kafka.common.serialization.Serdes $ WrapperSerde是否具有公共无参数构造函数?
完整转储在这里:https : //pastebin.com/WwpuXuxB
这是我尝试使用Serde的方式:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, EventMessage> eventsStream = builder.stream(stringSerde, messageSerde, topic);
KStream<String, EventMessage> outStream = eventsStream
.mapValues(value -> EventMessage.build(value.type, value.timestamp));
outStream.to("output");
Run Code Online (Sandbox Code Playgroud)
另外,我不太确定我是否正确设置了属性以全局设置串行器和解串器:
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, messageSerde.getClass());
Run Code Online (Sandbox Code Playgroud)
为了完成Matthias的回答,我刚刚编写了一个简单的示例,说明如何在Kafka Stream应用程序中创建自定义Serde(Serializer / Deserializer)。它可以克隆并尝试使用:https : //github.com/Davidcorral94/Kafka-Streams-Custom-Seder
首先,我创建两个类,一个用于序列化器,另一个用于反序列化器。在这种情况下,我使用Gson库执行序列化/反序列化。
public class PersonSerializer implements Closeable, AutoCloseable, Serializer<Person> {
private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, Person person) {
// Transform the Person object to String
String line = gson.toJson(person);
// Return the bytes from the String 'line'
return line.getBytes(CHARSET);
}
@Override
public void close() {
}
}
Run Code Online (Sandbox Code Playgroud)
public class PersonDeserializer implements Closeable, AutoCloseable, Deserializer<Person> {
private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public Person deserialize(String topic, byte[] bytes) {
try {
// Transform the bytes to String
String person = new String(bytes, CHARSET);
// Return the Person object created from the String 'person'
return gson.fromJson(person, Person.class);
} catch (Exception e) {
throw new IllegalArgumentException("Error reading bytes", e);
}
}
@Override
public void close() {
}
}
Run Code Online (Sandbox Code Playgroud)
然后,我将它们都包装到Serde中,以便可以将其用于我的Kafka Stream App。
public class PersonSerde implements Serde<Person> {
private PersonSerializer serializer = new PersonSerializer();
private PersonDeserializer deserializer = new PersonDeserializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<Person> serializer() {
return serializer;
}
@Override
public Deserializer<Person> deserializer() {
return deserializer;
}
}
Run Code Online (Sandbox Code Playgroud)
最后,您可以在下一行中将此Serde类用于您的Kafka Stream App:
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerde.class);
Run Code Online (Sandbox Code Playgroud)
实际上,这可以使用当前可用的最新Kafka版本1.0.0!
如果您调用,Serdes.serdeFrom(...)您将获得一个WrappedSerde供内部使用的类型(并且WrappedSerde没有无参构造函数)。目前没有可以调用的 API 来获取自定义Serde. 相反,您需要实现自己的Serde类并“手动”包装序列化器和反序列化器。
public class EventMessageSerde implements Serde<EventMessage> {
final private JsonSerializer<EventMessage> serializer;
final private JsonDeserializer<EventMessage> deserializer;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<EventMessage> serializer() {
return serializer;
}
@Override
public Deserializer<EventMessage> deserializer() {
return deserializer;
}
}
Run Code Online (Sandbox Code Playgroud)
在Properties你可以设置:
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, EventMessageSerde.class);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3806 次 |
| 最近记录: |