AVRO 原始类型的 Serde 类

Dav*_*ton 2 java avro apache-kafka apache-kafka-streams confluent-platform

我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器。连接器产生以下模式:

key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
    {"name": "firstname", "type": "string"},
    {"name": "lastname",  "type": "string"}
]}
Run Code Online (Sandbox Code Playgroud)

实际上,有几个主题,key-schema 总是“int”,value-schema 总是某种记录(用户、产品等)。我的代码包含以下定义

Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);

Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);
Run Code Online (Sandbox Code Playgroud)

起初我尝试使用类似的东西来消费这个主题, Consumed.with(Serdes.Integer(), userSerde);但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,但 avro 使用可变长度编码。使用Consumed.with(Serdes.Bytes(), userSerde);有效,但我真的想要 int 而不是字节,所以我将代码更改为此

KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true); 
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
Run Code Online (Sandbox Code Playgroud)

这使编译器产生警告(它不喜欢(Serde<Integer>)(Serde)强制转换)但它允许我使用

Consumed.with(keySerde, userSerde);并获取一个整数作为键。这工作得很好,我的应用程序按预期运行(很棒!!!)。但是现在我想为键/值定义默认的 serde 并且我无法让它工作。

设置默认值 serde 很简单:

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
Run Code Online (Sandbox Code Playgroud)

但是我无法弄清楚如何定义默认键 serde。

我试过

  1. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); 产生运行时错误:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共无参数构造函数
  2. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); 产生运行时错误:java.lang.Integer 不能转换为 org.apache.avro.specific.SpecificRecord

我错过了什么?谢谢。

Mat*_*Sax 5

更新 (5.5 及更新版本)

Confluent 版本5.5通过PrimitiveAvroSerde(cf. https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams /serdes/avro/PrimitiveAvroSerde.java )

原始答案 (5.4 及更早版本)

这是一个已知的问题。原始 Avro 类型不适用于 Confluent 的 AvroSerdes,因为 Serdes仅适用于GenericAvroRecordSpecificAvroRecord仅适用于。

比较https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro

因此,基于KafkaAvroSerializer并且构建您自己的 SerdeKafkaAvroDeserializer是正确的方法。为了能够将其作为默认 Serde 传递到配置中,您不能使用,Serdes.serdeFrom因为类型信息由于泛型类型擦除而丢失。

但是,您可以实现自己的扩展Serde接口的类,并将您的自定义类传递到配置中:

public class MySerde extends Serde<Integer> {
    // use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);
Run Code Online (Sandbox Code Playgroud)


小智 5

感谢@Matthias J. Sax 的提示,我想发布解决方案的工作。请随意增强它。

import java.util.Collections;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {

    private final Serde<Object> inner;

    /**
     * Constructor used by Kafka Streams.
     */
    public GenericPrimitiveAvroSerDe() {
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
    }

    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {
        this(client, Collections.emptyMap());
    }

    public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));
    }

    @Override
    public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
        inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
        inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
        inner.serializer().close();
        inner.deserializer().close();

    }

    @SuppressWarnings("unchecked")
    @Override
    public Serializer<T> serializer() {
        // TODO Auto-generated method stub
        Object obj = inner.serializer();
        return (Serializer<T>) obj;

    }

    @SuppressWarnings("unchecked")
    @Override
    public Deserializer<T> deserializer() {
        // TODO Auto-generated method stub
        Object obj = inner.deserializer();
        return (Deserializer<T>) obj;

    }

}
Run Code Online (Sandbox Code Playgroud)

用作默认流配置:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
Run Code Online (Sandbox Code Playgroud)

覆盖默认值:

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                        "http://localhost:8081");
       final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();
       keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
       final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();
       valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
Run Code Online (Sandbox Code Playgroud)