带有Avro和Schema Repo的Apache Kafka - 架构ID中的消息在哪里?

jhe*_*all 20 avro apache-kafka

我想使用Avro序列化我的Kafka消息的数据,并希望将它与Avro模式存储库一起使用,因此我不必在每条消息中都包含模式.

将Avro与Kafka一起使用似乎是一件很受欢迎的事情,很多博客/ Stack Overflow问题/用户组等参考都会发送带有消息的Schema ID,但我找不到它应该去的实际示例.

我认为它应该放在某处的Kafka消息标题中,但我找不到一个明显的地方.如果它在Avro消息中,则必须根据模式对其进行解码以获取消息内容并显示需要解码的模式,这有明显的问题.

我正在使用C#客户端,但任何语言的示例都会很棒.消息类包含以下字段:

public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }
Run Code Online (Sandbox Code Playgroud)

但这些似乎没有.MessageMetaData只有Offset和PartitionId.

那么,Avro Schema Id应该去哪里?

ser*_*jja 27

模式ID实际上是在avro消息本身中编码的.看看这个,看看编解码器/解码器是如何实现的.

一般来说,当你向Kafka发送Avro消息时会发生什么:

  1. 编码器从要编码的对象获取模式.
  2. 编码器向模式注册表询问此模式的ID.如果架构已经注册,您将获得现有ID,如果没有 - 注册表将注册架构并返回新ID.
  3. 对象的编码如下:[magic byte] [schema id] [实际消息]其中magic字节只是一个0x0字节,用于区分那种消息,schema id是一个4字节的整数值,其余的是实际的编码信息.

当您将消息解码回来时,会发生什么:

  1. 解码器读取第一个字节并确保它是0x0.
  2. 解码器读取接下来的4个字节并将它们转换为整数值.这是模式ID的解码方式.
  3. 现在,当解码器具有模式ID时,它可以向模式注册表询问此id的实际模式.瞧!

如果您的密钥是Avro编码,那么您的密钥将采用上述格式.这同样适用于价值.这样,您的密钥和值可能都是Avro值并使用不同的模式.

编辑以回答评论中的问题:

实际的模式存储在模式存储库中(实际上是模式存储库的整个点 - 存储模式:)).Avro对象容器文件格式与上述格式无关.KafkaAvroEncoder/Decoder使用略有不同的消息格式(但实际消息的编码方式完全相同).

这些格式之间的主要区别在于对象容器文件包含实际模式,并且可能包含与该模式对应的多个消息,而上述格式仅包含模式ID和与该模式对应的恰好一个消息.

传递对象 - 容器 - 文件编码的消息可能不会明显地跟随/维护,因为一个Kafka消息将包含多个Avro消息.或者您可以确保一条Kafka消息只包含一条Avro消息,但这会导致每条消息都携带模式.

Avro架构可能非常大(我已经看过600 KB以上的架构)并且每条消息都带有架构会非常昂贵且浪费,因此架构存储库就会出现这种情况 - 架构只获取一次并在本地缓存而所有其他查找只是快速的地图查找.

  • 您好@serejja,您遇到过不同的lib(更受欢迎)来处理这个问题吗?我已快速审查:https://github.com/linkedin/camus/tree/master/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders and t's似乎像一个有趣的来源, (2认同)