Kafka:编写自定义序列化程序

Paa*_*aji 20 java customization serialization json apache-kafka

我正在尝试用Kafka 0.8.1建立一个POC.我使用自己的java类作为Kafka消息,它有一堆String数据类型.我不能使用默认的序列化程序类或Kafka库附带的String serializer类.我想我需要编写自己的序列化程序并将其提供给生产者属性.如果您知道在Kafka中编写示例自定义序列化程序(在java中),请分享.非常感谢,非常感谢.

Sam*_*rry 41

编写自定义序列化程序所需的东西是:

  1. Encoder使用为泛型指定的对象 实现
    • 需要提供VerifiableProperties构造函数
  2. 覆盖toBytes(...)方法确保返回字节数组
  3. 将序列化程序类注入 ProducerConfig

为生产者声明自定义序列化程序

正如您在问题中所述,Kafka提供了一种为生产者声明特定序列化器的方法.序列化程序类在ProducerConfig实例中设置,该实例用于构造所需的Producer类.

如果您遵循Kafka的制作人示例,您将ProducerConfig通过一个Properties对象构建.构建属性文件时,请确保包括:

props.put("serializer.class", "path.to.your.CustomSerializer");
Run Code Online (Sandbox Code Playgroud)

通过类的路径,您希望Kafka在将消息附加到日志之前用它来序列化消息.

创建Kafka理解的自定义序列化程序

编写Kafka可以正确解释的自定义序列化程序需要实现Encoder[T]Kafka提供的scala类.在java中实现traits是很奇怪的,但是以下方法适用于在我的项目中序列化JSON:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}
Run Code Online (Sandbox Code Playgroud)

您的问题听起来好像您正在使用一个对象(让我们调用它CustomMessage)来添加到您的日志中的所有消息.如果是这种情况,您的序列化程序可能看起来更像这样:

package com.project.serializer;

public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}
Run Code Online (Sandbox Code Playgroud)

这将使您的属性配置看起来像这样:

props.put("serializer.class", "path.to.your.CustomSerializer");
Run Code Online (Sandbox Code Playgroud)

  • 现在我们已经序列化了对象,你如何反序列化kafka使用者中的字节数组? (3认同)
  • 一个建议:由于`ObjectMapper`的构造是一个重量级的操作,并且因为它们在构造之后是线程安全的,所以为编码器/解码器创建它的静态最终实例是有意义的.否则施工将比实际读/写时间长10倍. (3认同)
  • 感谢Sam B.非常乐于助人. (2认同)

小智 12

您需要实现编码和解码器

public class JsonEncoder implements Encoder<Object> {
        private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);

        public JsonEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }

        @Override
        public byte[] toBytes(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsString(object).getBytes();
            } catch (JsonProcessingException e) {
                LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            }
            return "".getBytes();
        }
    }
Run Code Online (Sandbox Code Playgroud)

解码器代码

public class JsonDecoder  implements Decoder<Object> {
    private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
    public JsonDecoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public Object fromBytes(byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(bytes, Map.class);
        } catch (IOException e) {
            LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
        }
        return null;
    }
}
Run Code Online (Sandbox Code Playgroud)

pom入口

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.4.1.3</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

在Kafka属性中设置默认编码器

properties.put("serializer.class","kafka.serializer.DefaultEncoder");
Run Code Online (Sandbox Code Playgroud)

作者和读者代码如下

byte[] bytes = encoder.toBytes(map);
        KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);

JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());
Run Code Online (Sandbox Code Playgroud)