Kno*_*uch 4 apache-kafka kafka-consumer-api kafka-producer-api
我试图找到一个例子,我可以从kafka生成和订阅avro消息.
在这个时间点,我想使用"vanilla"kafka部署而没有任何汇合的附加组件.
这可能吗?到目前为止,我发现的所有示例都很快开始使用融合的特定工具来传递avro消息.
我确信应该有一种方法可以在kafka平台上发布和使用avro消息,并且任何"特定于发行版"都没有插件.
当然,你可以在没有任何Confluent工具的情况下做到这一点.但是你必须做更多的工作(例如在你的应用程序代码中) - 这是提供Avro相关工具的最初动机,例如你提到的Confluent中的工具.
一种选择是手动序列化/(来自例如反序列化卡夫卡消息的有效载荷中YourJavaPojo,以byte[]通过使用Apache阿夫罗)的Java API直接.(我想你暗示Java是首选的编程语言.)这看起来怎么样?这是一个例子.
byte[]),然后使用Kafka的Java生成器客户端将编码的有效负载写入Kafka主题.byte[]Java pojo).当然,当您使用Kafka Streams(将包含在即将推出的Apache Kafka 0.10中)或Apache Storm等流处理工具时,您也可以直接使用Avro API.
最后,您还可以选择使用一些实用程序库(无论是来自Confluent还是其他地方),这样您就不必直接使用Apache Avro API.对于它的价值,我在kafka-storm-starter上发布了一些稍微复杂的例子,例如AvroDecoderBolt.scala所证明的.在这里,Avro序列化/反序列化是通过使用Scala库Twitter Bijection完成的.这是一个AvroDecoderBolt.scala给出一般想法的示例片段:
// This tells Bijection how to automagically deserialize a Java type `T`,
// given a byte array `byte[]`.
implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
// Let's put Bijection to use.
private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
require(bytes != null, "bytes must not be null")
val decodeTry = Injection.invert(bytes) // <-- deserialization, using Twitter Bijection, happens here
decodeTry match {
case Success(pojo) =>
log.debug("Binary data decoded into pojo: " + pojo)
collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
()
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
}
Run Code Online (Sandbox Code Playgroud)
所以是的,您当然可以选择不使用任何其他库,例如Confluent的Avro序列化器/反序列化器(目前作为confluentinc/schema-registry的一部分提供)或Twitter的Bijection.是否值得额外的努力取决于你自己决定.
| 归档时间: |
|
| 查看次数: |
1304 次 |
| 最近记录: |