为什么将 Avro 与 Kafka 一起使用 - 如何处理 POJO

adp*_*pap 9 java avro apache-kafka confluent-schema-registry

我有一个 spring 应用程序,它是我的 kafka 制作人,我想知道为什么 avro 是最好的方法。我阅读了它以及它所提供的所有内容,但是为什么我不能将我用 jackson 创建的 POJO 序列化并将其发送到 kafka?

我这么说是因为 avro 的 POJO 生成不是那么简单。最重要的是,它需要 maven 插件和一个 .avsc 文件。

因此,例如我在我的 kafka 生产者上有一个 POJO,名为 User:

public class User {

    private long    userId;

    private String  name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }

}
Run Code Online (Sandbox Code Playgroud)

我将它序列化并将其发送到我在 kafka 中的用户主题。然后我有一个消费者,它本身有一个 POJO 用户并反序列化消息。是空间问题吗?以这种方式序列化和反序列化不是也更快吗?更不用说维护模式注册表的开销了。

cri*_*007 9

你不需要 AVSC,你可以使用一个 AVDL 文件,它基本上看起来和一个只有字段的 POJO 一样

@namespace("com.example.mycode.avro")
protocol ExampleProtocol {
   record User {
     long id;
     string name;
   }
}
Run Code Online (Sandbox Code Playgroud)

其中,当使用idl-protocolMaven 插件的目标时,将为您创建这个 AVSC,而不是您自己编写。

{
  "type" : "record",
  "name" : "User",
  "namespace" : "com.example.mycode.avro",
  "fields" : [ {
    "name" : "id",
    "type" : "long"
  }, {
    "name" : "name",
    "type" : "string"
  } ]
}
Run Code Online (Sandbox Code Playgroud)

它还会在您的类路径上放置一个SpecificDataPOJO User.java,以便在您的代码中使用。


如果您已经拥有 POJO,则无需使用 AVSC 或 AVDL 文件。有一些库可以转换 POJO。例如,您可以使用 Jackson,这不仅适用于 JSON,您可能只需要JacksonAvroSerializer为 Kafka创建一个,例如,或者查找是否存在。

Avro 也有基于反射的内置库


那么问题来了 -为什么是 Avro(对于 Kafka)?

好吧,拥有架构是一件好事。想想 RDBMS 表,你可以解释表,你会看到所有的列。转向 NoSQL 文档数据库,它们实际上可以包含任何内容,这就是 Kafka 的 JSON 世界。

假设您的 Kafka 集群中有消费者不知道主题中的内容,他们必须确切知道谁/什么已经生成到主题中。他们可以试试console consumer,如果是像JSON这样的明文,那么他们必须找出一些他们感兴趣的字段,然后.get("name")一次又一次地执行类似HashMap的片状操作,只有当一个字段没有时才遇到NPE不存在。使用 Avro,您可以清楚地定义默认值和可为空的字段。

不需要使用架构注册表,但它为explain topicRDBMS 类比提供了这种类型的语义。它还使您无需随每条消息一起发送模式,以及 Kafka 主题的额外带宽费用。注册表不仅对 Kafka 有用,因为它可以用于 Spark、Flink、Hive 等,用于围绕流数据摄取的所有数据科学分析。


假设您确实想使用 JSON,然后尝试改用 MsgPack,您可能会看到 Kafka 吞吐量增加并节省了代理的磁盘空间


你也可以使用其他格式,比如 Protobuf 或 Thrift,就像 Uber 比较过的那样


Gio*_*ous 5

这是速度和存储的问题。序列化数据时,您通常需要传输实际模式,因此,这会导致有效负载大小的增加。

                            Total Payload Size
+-----------------+--------------------------------------------------+
|     Schema      |                 Serialised Data                  |
+-----------------+--------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

模式注册表提供模式和元数据的集中存储库,以便所有模式都在中央系统中注册。这个集中式系统使生产者能够仅包含模式的 ID,而不是完整的模式本身(以文本格式)。

                      Total Payload Size
+----+--------------------------------------------------+
| ID |                 Serialised Data                  |
+----+--------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

因此,序列化变得更快。

此外,架构注册表版本控制可以强制执行数据策略,这可能有助于防止较新的架构破坏与现有版本的兼容性,从而可能导致管道中的停机或任何其他重大问题。


Confluence 的这篇文章 彻底解释了 Schema Registry 的更多好处。