kafka 连接转换器与转换

Nik*_*ksP 0 nginx apache-kafka apache-kafka-connect

我正在尝试创建以下工作流程

  1. nginx 日志由 kafka 连接器获取并上传到主题
  2. hdfs 同步连接器然后将这些日志放入 hdfs
  3. Hive 用于对 hdfs 数据进行分析(例如,按 IP 地址分组的访问数量等)

虽然我可以按照 hive Metastore 所需的格式排列 nginx 日志(仅限空格或逗号分隔的必填字段),但我想知道这是否可以在不触及 nginx 日志格式的情况下完成

  1. 使用类似于 org.apache.kafka.connect.json.JsonConverter 的转换器
  2. 使用单个消息转换

这两种方法都需要自定义实现,并且关于如何进行相同操作的文档很少。

哪一种是实现这一目标的正确方法?是否有任何示例可用于解析 nginx 日志输出/任何源数据,同时使用 kafka 连接将其写入主题。我正在使用独立的文件连接器。

Ran*_*uch 5

Kafka Connect 源连接器负责将消息从源中的表示(例如,nginx 日志消息格式)转换为SourceRecord使用 Kafka ConnectStructSchema数据结构的内存中表示调用。Kafka Connect 然后使用它的转换器将记录的内存表示转换byte[]为实际写入 Kafka 的表示。

这种职责分离非常重要,因为它允许您混合和匹配功能。写入主题的序列化消息的确切性质可以独立于连接器进行更改。例如,一些开发人员更喜欢使用 JSON 编写数据。许多其他人更喜欢使用通用模式注册表使用 Avro 序列化消息,这种组合可以确保所有消息使用特定模式,同时让该模式以向后兼容的方式随时间发展,以便生产者可以发展到新版本的模式而消费者可以在稍后的某个时间点适应该模式。使用 Avro 模式和模式注册表提供了巨大的好处

底线:不要创建知道上游数据源的自定义转换器。您将用太多的耦合来限制自己,因为byte[]表示将是自定义的,并且只能由也知道此特定表示的消费者和应用程序使用。

相反,如果您需要稍微调整内存中的消息结构,请使用现有的源连接器和单消息转换。在这种情况下,甚至最好创建一个自定义源连接器(可能专门用于现有的基于文件的源连接器),将 nginx 日志消息格式调整为结构化的内存表示。