如何将字节从Kafka转换为其原始对象?

JSR*_*R29 5 apache-kafka apache-spark spark-streaming spark-avro

我正在从Kafka提取数据,然后Array[Byte]使用默认的解码器反序列化,然后我的RDD元素如下所示(null,[B@406fa9b2)(null,[B@21a9fe0)但是我想要具有模式的原始数据,那么如何实现呢?

我以Avro格式序列化邮件。

Jac*_*ski 5

您必须使用适当的反序列化器对字节进行解码,例如字符串或自定义对象。

如果你不进行解码,你得到[B@406fa9b2的只是 Java 中字节数组的文本表示。

Kafka 对消息的内容一无所知,因此它将字节数组从生产者传递到消费者。

在 Spark Streaming 中,您必须对键和值使用序列化器(引用KafkaWordCount 示例):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
Run Code Online (Sandbox Code Playgroud)

通过上述序列化器,您可以DStream[String]使用RDD[String].

但是,如果您想直接将字节数组反序列化为自定义类,则必须编写一个自定义序列化器(这是 Kafka 特定的,与 Spark 无关)。

我建议使用具有固定模式的 JSON 或 Avro(使用Kafka、Spark 和 Avro - 第 3 部分,生成和使用 Avro 消息中描述的解决方案)。


然而,在结构化流中,管道可能如下所示:

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string") // <-- conversion here
Run Code Online (Sandbox Code Playgroud)