Kafka Connect:如何使用hdfs sink连接器将Kafka主题的protobuf数据发送到HDFS?

NoN*_*ame 5 apache-kafka apache-kafka-connect

我有一个生产者正在为一个主题制作protobuf消息.我有一个消费者应用程序,它反序列化protobuf消息.但是hdfs sink连接器直接从Kafka主题中获取消息.将键和值转换器etc/schema-registry/connect-avro-standalone.properties设置为什么?最好的方法是什么?提前致谢!

Ewe*_*ava 4

Kafka Connect 旨在通过转换器的概念将 Kafka 中的序列化格式问题与各个连接器分开。正如您似乎发现的那样,您需要调整key.convertervalue.converter类以支持 protobufs 的实现。这些类通常被实现为普通的 Kafka 反序列化器,然后执行从序列化特定的运行时格式(例如 protobuf 中的消息)到 Kafka Connect 的运行时 API(没有任何关联的序列化格式 - 它只是一个Java 类型集和一个用于定义模式的类)。

我不知道现有的实现。实现这一点的主要挑战是 protobufs 是自描述的(即您可以反序列化它而无需访问原始模式),但由于它的字段只是整数 ID,如果没有 a) 要求,您可能不会获得有用的模式信息转换器可以使用特定模式,例如通过配置(这使得迁移模式更加复杂)或 b) 模式注册表服务 + 数据的包装格式,允许您动态查找模式。

  • Kafka Connect 绝对不想限制支持格式的数量。恰恰相反,我们包含了转换器,并使连接器的数据 API 变得通用,以支持插入不同的序列化格式。protobuf 实现绝对是有价值的,我建议发布它。我们愿意将其与 AvroConverter 一起包含在我们的存储库中,尽管我们希望获得相当完整的实现。我看到的最大问题是,为了获得有用的实现,我希望您需要类似于模式注册表的东西。 (3认同)
  • 我有一个几乎不起作用的实现。我使用“avro-protobuf”扩展了“AvroConter”类的“Deserializer”。据我所知,Kafka Connect 希望限制支持格式的数量,即 JSON 和 Avro,因此我不会按原样发布它。话又说回来,我不想将整个“avro-converter”复制并重命名为“protobuf-converter”。贡献该项目的最佳方法是什么? (2认同)