Avro特定与通用记录类型 - 哪个最好还是可以在之间进行转换?

Bri*_*ian 5 serialization avro

我们正在尝试在提供通用与特定记录格式以供客户消费之间做出决定,着眼于提供客户端在更新架构时可以访问的在线架构注册表.我们希望发送带有几个字节的序列化blob,这些字节表示版本号,因此可以自动从我们的注册表中检索模式.

现在,我们遇到了代码示例,说明了通用格式对架构更改的相对适应性,但我们不愿意放弃特定格式提供的类型安全性和易用性.

有没有办法获得两全其美?即我们可以在内部使用和操作特定的生成类,然后让它们在序列化之前自动将它们转换为通用记录吗?
然后,客户端将反序列化通用记录(在查找模式之后).

此外,客户可以在以后将这些通用记录转换为特定记录吗?一些小代码示例会有所帮助!

或者我们是以错误的方式看待这个?

Kra*_*tam 3

您正在寻找的是 Confluence Schema 注册表服务和库,它们有助于与其集成。

提供一个示例来使用不断发展的模式编写序列化反序列化 avro 数据。请注意提供来自 Kafka 的示例。

import io.confluent.kafka.serializers.KafkaAvroDeserializer;  
import io.confluent.kafka.serializers.KafkaAvroSerializer; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex;

import java.util.HashMap; import java.util.Map;

public class ConfluentSchemaService {

    public static final String TOPIC = "DUMMYTOPIC";

    private KafkaAvroSerializer avroSerializer;
    private KafkaAvroDeserializer avroDeserializer;

    public ConfluentSchemaService(String conFluentSchemaRigistryURL) {

        //PropertiesMap
        Map<String, String> propMap = new HashMap<>();
        propMap.put("schema.registry.url", conFluentSchemaRigistryURL);
        // Output afterDeserialize should be a specific Record and not Generic Record
        propMap.put("specific.avro.reader", "true");

        avroSerializer = new KafkaAvroSerializer();
        avroSerializer.configure(propMap, true);

        avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(propMap, true);
    }

    public String hexBytesToString(byte[] inputBytes) {
        return Hex.encodeHexString(inputBytes);
    }

    public byte[] hexStringToBytes(String hexEncodedString) throws DecoderException {
        return Hex.decodeHex(hexEncodedString.toCharArray());
    }

    public byte[] serializeAvroPOJOToBytes(GenericRecord avroRecord) {
        return avroSerializer.serialize(TOPIC, avroRecord);
    }

    public Object deserializeBytesToAvroPOJO(byte[] avroBytearray) {
        return avroDeserializer.deserialize(TOPIC, avroBytearray);
    } }
Run Code Online (Sandbox Code Playgroud)

以下课程包含您正在寻找的所有代码。io.confluence.kafka.serializers.KafkaAvroDeserializer;
io.confluence.kafka.serializers.KafkaAvroSerializer;

请点击以下链接了解更多详情:

http://bytepadding.com/big-data/spark/avro/avro-serialization-de-serialization-using-confluence-schema-registry/