用Scaf写的Kafka Consumer for Spark为Kafka API 0.10:自定义AVRO解串器

Blu*_*Dog 3 scala apache-kafka apache-spark

我正在将Spark Scala App Kafka API升级到v.0.10.我曾经创建自定义方法来反序列化以字节字符串格式出现的消息.

我已经意识到有一种方法可以将StringDeserializer或ByteArrayDeserializer作为参数传递给键或值.

但是,我找不到有关如何创建自定义Avro架构反序列化器的任何信息,因此我的kafkaStream可以在createDirectStream和使用Kafka的数据时使用它.

可能吗?

Yuv*_*kov 5

有可能的.您需要覆盖Deserializer<T>定义的接口,org.apache.kafka.common.serialization并且需要通过包含Kafka参数的类指向key.deserializervalue.deserializer定制自定义类ConsumerStrategy[K, V].例如:

import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer extends Deserializer[Array[Byte]] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
  override def close(): Unit = ???
  override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}
Run Code Online (Sandbox Code Playgroud)

然后:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer

val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[AvroDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
Run Code Online (Sandbox Code Playgroud)