在消费端通过 kafka 对 UUID 进行 Avro 自定义解码

bec*_*ert 3 java uuid avro apache-kafka

我已经编写了一个类来自定义将 UUID 类型的对象编码为要跨 kafka 和 avro 传输的字节。

为了使用这个类,我@AvroEncode(using=UUIDAsBytesEncoding.class)在目标对象中的 uuid 变量上方放置了一个。(这是由 apache avro 反射库实现的)

我很难弄清楚如何让我的消费者自动使用自定义解码器。(或者我必须进去手动解码吗?)。

这是我的 UUIDAsBytesEncoder 扩展 CustomEncoding 类:

public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {

    public UUIDAsBytesEncoding() {
        List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
        union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");

        schema = Schema.createUnion(union);
    }

    @Override
    protected void write(Object datum, Encoder out) throws IOException {
        if(datum != null) {
            // encode the position of the data in the union
            out.writeLong(1);

            // convert uuid to bytes
            byte[] bytes = new byte[16];
            Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);

            // encode length of data
            out.writeLong(16);

            // write the data
            out.writeBytes(bytes);
        } else {
            // position of null in union
            out.writeLong(0);
        }
    }

    @Override
    protected UUID read(Object reuse, Decoder in) throws IOException {
        System.out.println("READING");
        Long size = in.readLong();
        Long leastSig = in.readLong();
        Long mostSig = in.readLong();
        return new UUID(mostSig, leastSig);
    }
}
Run Code Online (Sandbox Code Playgroud)

写方法和编码工作得很好,但读方法永远不会被反序列化调用。我将如何在消费者中实现这一点?

注册表上的架构如下所示:

{"type":"record","name":"Request","namespace":"xxxxxxxx.xxx.xxx","fields":[{"name":"password","type":"string" },{"name":"email","type":"string"},{"name":"id","type":["null",{"type":"bytes","CustomEncoding" :"UUIDAsBytesEncoding"}],"default":null}]} `

如果消费者不能自动使用该信息来使用 UUIDAsBytesEncoding 读取方法,那么我将如何在我的消费者中找到标有该标签的数据?

我也在使用融合模式注册表。

任何帮助,将不胜感激!

bec*_*ert 5

最终找到了解决方案。编码不正确——内置的 writeBytes() 方法会自动为您写入长度。

然后在消费者中,我们必须通过一个 GenericDatumWriter 来做,写入一个二进制流,然后用一个 ReflectDatumReader 从二进制流中读取。这将自动调用 UUIAsBytesEncoding read() 方法并反序列化 UUID。

我的消费者看起来像这样(作为消费者组执行者服务演练的一部分):

/**
 * Start a single consumer instance
 * This will use the schema built into the IndexedRecord to decode and create key/value for the message
 */
public void run() {
    ConsumerIterator it = this.stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata messageAndMetadata = it.next();
        try {
            String key = (String) messageAndMetadata.key();
            IndexedRecord value = (IndexedRecord) messageAndMetadata.message();

            ByteArrayOutputStream bytes = new ByteArrayOutputStream();

            GenericDatumWriter<Object> genericRecordWriter = new GenericDatumWriter<>(value.getSchema());
            genericRecordWriter.write(value, EncoderFactory.get().directBinaryEncoder(bytes, null));

            ReflectDatumReader<T> reflectDatumReader = new ReflectDatumReader<>(value.getSchema());
            T newObject = reflectDatumReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
            IOUtils.closeQuietly(bytes);

            System.out.println("************CONSUMED:  " + key + ": "+ newObject);

        } catch(SerializationException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    System.out.println("Shutting down Thread: " + this.threadNumber);
}
Run Code Online (Sandbox Code Playgroud)

然后新的 UUIDAsBytesEncoding 看起来像:

public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {

    public UUIDAsBytesEncoding() {
        List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
        union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");

        schema = Schema.createUnion(union);
    }

    @Override
    protected void write(Object datum, Encoder out) throws IOException {
        if(datum != null) {
            // encode the position of the data in the union
            out.writeLong(1);

            // convert uuid to bytes
            byte[] bytes = new byte[16];
            Conversion.uuidToByteArray(((UUID) datum), bytes, 0, 16);

            // write the data
            out.writeBytes(bytes);
        } else {
            // position of null in union
            out.writeLong(0);
        }
    }

    @Override
    protected UUID read(Object reuse, Decoder in) throws IOException {
        // get index in union
        int index = in.readIndex();
        if (index == 1) {
            // read in 16 bytes of data
            ByteBuffer b = ByteBuffer.allocate(16);
            in.readBytes(b);

            // convert
            UUID uuid = Conversion.byteArrayToUuid(b.array(), 0);

            return uuid;
        } else {
            // no uuid present
            return null;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这也是如何实现 CustomEncoding avro 类的示例。当前版本的 avro 没有内置 UUID 序列化程序,因此这是该问题的解决方案。