Vin*_*ran 11 java serialization apache-kafka
我有我的自定义Java对象,并希望利用内置序列化中的JVM将其发送到Kafka主题,但序列化失败,出现以下错误
org.apache.kafka.common.errors.SerializationException:无法将类com.spring.kafka.Payload的值转换为value.serializer中指定的类org.apache.kafka.common.serialization.ByteArraySerializer
Payload.java
public class Payload implements Serializable {
    private static final long serialVersionUID = 123L;
    private String name="vinod";
    private int anInt = 5;
    private Double aDouble = new Double("5.0");
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAnInt() {
        return anInt;
    }
    public void setAnInt(int anInt) {
        this.anInt = anInt;
    }
    public Double getaDouble() {
        return aDouble;
    }
    public void setaDouble(Double aDouble) {
        this.aDouble = aDouble;
    }
}
在我创建生产者期间,我设置了以下属性
<entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />
                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />
我的发送调用如下
kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));
通过生产者将自定义java对象发送到kafka主题的正确方法是什么?
Vin*_*ran 14
我们有2个选项,如下所示
1)如果我们打算向生产者发送自定义java对象,我们需要创建一个实现org.apache.kafka.common.serialization.Serializer的序列化程序,并在创建生成器期间传递该Serializer类
代码参考如下
public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {
    public void configure(Map map, boolean b) {
    }
    public byte[] serialize(String s, Object o) {
       try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) {
            return new byte[0];
        }
    }
    public void close() {
    }
}
并相应地设置值序列化器
<entry key="value.serializer"
                       value="com.spring.kafka.PayloadSerializer" />
2)无需创建自定义序列化程序类.使用现有的ByteArraySerializer,但在发送过程中遵循该过程
Java对象 - >字符串(优选JSON表示而不是toString) - > byteArray
小智 5
由于您正在使用ByteArraySerializer,您需要实例化一个 byte[] 生产者。
Producer<byte[],byte[]> producer = new KafkaProducer<>(props);
然后在序列化或其他方法后生成传递字节 [] 时,例如,
producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes()));
如果您仅将有效负载对象传递给生产者,那么最好将键序列化器和值序列化器作为您打算传递的任何内容,并且在读取时需要从该数据中读取。
使用 Serializable 和 ByteArraySerializer/ByteArrayDeserializer 是一种很好的做法。
| 归档时间: | 
 | 
| 查看次数: | 14387 次 | 
| 最近记录: |