Sam*_*amy 5 apache-kafka-streams
我需要创建一个使用String键HashMap作为值的状态存储.我尝试了以下两种方法.
// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
.withKeys(Serdes.String())
.withValues(HashMap.class)
.persistent()
.build();
// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();
StateStoreSupplier avgStore1 = Stores.create("Avgs")
.withKeys(Serdes.String())
.withValues(Serdes.serdeFrom(h.getClass()))
.persistent()
.build();
Run Code Online (Sandbox Code Playgroud)
代码编译没有任何错误,但我得到一个运行时错误
io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer
Run Code Online (Sandbox Code Playgroud)
有人可以建议我创建一个状态商店的正确方法是什么?
如果要创建状态存储,则需要为要使用的类型提供序列化程序和反序列化程序类.在Kafka Stream中,有一个名为Serde的抽象,它将序列化器和反序列化器包装在一个类中.
如果你使用.withValues(Class<K> keyClass)它必须持有它
@param keyClass键的类,它必须是Kafka内置serdes的类型之一
因为没有内置Serdes的HashMap,你需要实现一个第一(也许叫HashMapSerde),并给这个类的方法.withValues(Serde<K> keySerde).此外,您还必须实现实际的序列化器和反序列化器HashMap.如果您知道HashMap的泛型类型,则应指定它们(使串行器和解串器的实现更加简单的原因.
像这样的东西(只是一个草图;省略了泛型类型):
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
public class HashMapSerde implements Serde<HashMap> {
void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
void close() {
/* put your code here */
}
Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public byte[] serialize(String topic, T data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public T deserialize(String topic, byte[] data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
}
Run Code Online (Sandbox Code Playgroud)
如果您想查看如何实现(de)序列化程序的示例,请Serde查看https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/ common/serialization和https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
| 归档时间: |
|
| 查看次数: |
2110 次 |
| 最近记录: |